1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18 
19 #include <bitset>
20 #include "perfetto/ext/base/utils.h"
21 #include "perfetto/ext/tracing/core/basic_types.h"
22 #include "perfetto/ext/tracing/core/commit_data_request.h"
23 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
24 #include "perfetto/ext/tracing/core/trace_packet.h"
25 #include "perfetto/ext/tracing/core/trace_writer.h"
26 #include "src/base/test/gtest_test_suite.h"
27 #include "src/base/test/test_task_runner.h"
28 #include "src/tracing/core/patch_list.h"
29 #include "src/tracing/test/aligned_buffer_test.h"
30 #include "src/tracing/test/fake_producer_endpoint.h"
31 #include "test/gtest_and_gmock.h"
32 
33 #include "protos/perfetto/trace/test_event.pbzero.h"
34 #include "protos/perfetto/trace/trace_packet.pbzero.h"
35 
36 namespace perfetto {
37 
38 using testing::_;
39 using testing::Invoke;
40 using testing::Mock;
41 
42 class MockProducerEndpoint : public TracingService::ProducerEndpoint {
43  public:
RegisterDataSource(const DataSourceDescriptor &)44   void RegisterDataSource(const DataSourceDescriptor&) override {}
UnregisterDataSource(const std::string &)45   void UnregisterDataSource(const std::string&) override {}
NotifyFlushComplete(FlushRequestID)46   void NotifyFlushComplete(FlushRequestID) override {}
NotifyDataSourceStarted(DataSourceInstanceID)47   void NotifyDataSourceStarted(DataSourceInstanceID) override {}
NotifyDataSourceStopped(DataSourceInstanceID)48   void NotifyDataSourceStopped(DataSourceInstanceID) override {}
ActivateTriggers(const std::vector<std::string> &)49   void ActivateTriggers(const std::vector<std::string>&) {}
Sync(std::function<void ()>)50   void Sync(std::function<void()>) override {}
shared_memory() const51   SharedMemory* shared_memory() const override { return nullptr; }
shared_buffer_page_size_kb() const52   size_t shared_buffer_page_size_kb() const override { return 0; }
CreateTraceWriter(BufferID,BufferExhaustedPolicy)53   std::unique_ptr<TraceWriter> CreateTraceWriter(
54       BufferID,
55       BufferExhaustedPolicy) override {
56     return nullptr;
57   }
MaybeSharedMemoryArbiter()58   SharedMemoryArbiter* MaybeSharedMemoryArbiter() override { return nullptr; }
IsShmemProvidedByProducer() const59   bool IsShmemProvidedByProducer() const override { return false; }
60 
61   MOCK_METHOD2(CommitData, void(const CommitDataRequest&, CommitDataCallback));
62   MOCK_METHOD2(RegisterTraceWriter, void(uint32_t, uint32_t));
63   MOCK_METHOD1(UnregisterTraceWriter, void(uint32_t));
64 };
65 
66 class SharedMemoryArbiterImplTest : public AlignedBufferTest {
67  public:
SetUp()68   void SetUp() override {
69     AlignedBufferTest::SetUp();
70     task_runner_.reset(new base::TestTaskRunner());
71     arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
72                                                &mock_producer_endpoint_,
73                                                task_runner_.get()));
74   }
75 
IsArbiterFullyBound()76   bool IsArbiterFullyBound() { return arbiter_->fully_bound_; }
77 
TearDown()78   void TearDown() override {
79     arbiter_.reset();
80     task_runner_.reset();
81   }
82 
83   std::unique_ptr<base::TestTaskRunner> task_runner_;
84   std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
85   MockProducerEndpoint mock_producer_endpoint_;
86   std::function<void(const std::vector<uint32_t>&)> on_pages_complete_;
87 };
88 
89 size_t const kPageSizes[] = {4096, 65536};
90 INSTANTIATE_TEST_SUITE_P(PageSize,
91                          SharedMemoryArbiterImplTest,
92                          ::testing::ValuesIn(kPageSizes));
93 
94 // The buffer has 14 pages (kNumPages), each will be partitioned in 14 chunks.
95 // The test requests 30 chunks (2 full pages + 2 chunks from a 3rd page) and
96 // releases them in different batches. It tests the consistency of the batches
97 // and the releasing order.
TEST_P(SharedMemoryArbiterImplTest,GetAndReturnChunks)98 TEST_P(SharedMemoryArbiterImplTest, GetAndReturnChunks) {
99   SharedMemoryArbiterImpl::set_default_layout_for_testing(
100       SharedMemoryABI::PageLayout::kPageDiv14);
101   static constexpr size_t kTotChunks = kNumPages * 14;
102   SharedMemoryABI::Chunk chunks[kTotChunks];
103   for (size_t i = 0; i < 14 * 2 + 2; i++) {
104     chunks[i] = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kStall);
105     ASSERT_TRUE(chunks[i].is_valid());
106   }
107 
108   // Finally return the first 28 chunks (full 2 pages) and only the 2nd chunk of
109   // the 2rd page. Chunks are release in interleaved order: 1,0,3,2,5,4,7,6.
110   // Check that the notification callback is posted and order is consistent.
111   auto on_commit_1 = task_runner_->CreateCheckpoint("on_commit_1");
112   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
113       .WillOnce(Invoke([on_commit_1](const CommitDataRequest& req,
114                                      MockProducerEndpoint::CommitDataCallback) {
115         ASSERT_EQ(14 * 2 + 1, req.chunks_to_move_size());
116         for (size_t i = 0; i < 14 * 2; i++) {
117           ASSERT_EQ(i / 14, req.chunks_to_move()[i].page());
118           ASSERT_EQ((i % 14) ^ 1, req.chunks_to_move()[i].chunk());
119           ASSERT_EQ(i % 5 + 1, req.chunks_to_move()[i].target_buffer());
120         }
121         ASSERT_EQ(2u, req.chunks_to_move()[28].page());
122         ASSERT_EQ(1u, req.chunks_to_move()[28].chunk());
123         ASSERT_EQ(42u, req.chunks_to_move()[28].target_buffer());
124         on_commit_1();
125       }));
126   PatchList ignored;
127   for (size_t i = 0; i < 14 * 2; i++) {
128     arbiter_->ReturnCompletedChunk(std::move(chunks[i ^ 1]), i % 5 + 1,
129                                    &ignored);
130   }
131   arbiter_->ReturnCompletedChunk(std::move(chunks[29]), 42, &ignored);
132   task_runner_->RunUntilCheckpoint("on_commit_1");
133 
134   // Then release the 1st chunk of the 3rd page, and check that we get a
135   // notification for that as well.
136   auto on_commit_2 = task_runner_->CreateCheckpoint("on_commit_2");
137   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
138       .WillOnce(Invoke([on_commit_2](const CommitDataRequest& req,
139                                      MockProducerEndpoint::CommitDataCallback) {
140         ASSERT_EQ(1, req.chunks_to_move_size());
141         ASSERT_EQ(2u, req.chunks_to_move()[0].page());
142         ASSERT_EQ(0u, req.chunks_to_move()[0].chunk());
143         ASSERT_EQ(43u, req.chunks_to_move()[0].target_buffer());
144         on_commit_2();
145       }));
146   arbiter_->ReturnCompletedChunk(std::move(chunks[28]), 43, &ignored);
147   task_runner_->RunUntilCheckpoint("on_commit_2");
148 }
149 
TEST_P(SharedMemoryArbiterImplTest,BatchCommits)150 TEST_P(SharedMemoryArbiterImplTest, BatchCommits) {
151   SharedMemoryArbiterImpl::set_default_layout_for_testing(
152       SharedMemoryABI::PageLayout::kPageDiv1);
153 
154   // Batching period is 0s - chunks are being committed as soon as they are
155   // returned.
156   SharedMemoryABI::Chunk chunk =
157       arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDefault);
158   ASSERT_TRUE(chunk.is_valid());
159   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _)).Times(1);
160   PatchList ignored;
161   arbiter_->ReturnCompletedChunk(std::move(chunk), 0, &ignored);
162   task_runner_->RunUntilIdle();
163   ASSERT_TRUE(Mock::VerifyAndClearExpectations(&mock_producer_endpoint_));
164 
165   // Since we cannot explicitly control the passage of time in task_runner_, to
166   // simulate a non-zero batching period and a commit at the end of it, set the
167   // batching duration to a very large value and call
168   // FlushPendingCommitDataRequests to manually trigger the commit.
169   arbiter_->SetDirectSMBPatchingSupportedByService();
170   ASSERT_TRUE(arbiter_->EnableDirectSMBPatching());
171   arbiter_->SetBatchCommitsDuration(UINT32_MAX);
172 
173   // First chunk that will be batched. CommitData should not be called
174   // immediately this time.
175   chunk = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDefault);
176   ASSERT_TRUE(chunk.is_valid());
177   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _)).Times(0);
178   // We'll pretend that the chunk needs patching. This is done in order to
179   // verify that chunks that need patching are not marked as complete (i.e. they
180   // are kept in state kChunkBeingWritten) before the batching period ends - in
181   // case a patch for them arrives during the batching period.
182   chunk.SetFlag(SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
183   arbiter_->ReturnCompletedChunk(std::move(chunk), 1, &ignored);
184   task_runner_->RunUntilIdle();
185   ASSERT_TRUE(Mock::VerifyAndClearExpectations(&mock_producer_endpoint_));
186   ASSERT_EQ(SharedMemoryABI::kChunkBeingWritten,
187             arbiter_->shmem_abi_for_testing()->GetChunkState(1u, 0u));
188 
189   // Add a second chunk to the batch. This should also not trigger an immediate
190   // call to CommitData.
191   chunk = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDefault);
192   ASSERT_TRUE(chunk.is_valid());
193   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _)).Times(0);
194   arbiter_->ReturnCompletedChunk(std::move(chunk), 2, &ignored);
195   task_runner_->RunUntilIdle();
196   ASSERT_TRUE(Mock::VerifyAndClearExpectations(&mock_producer_endpoint_));
197   // This chunk does not need patching, so it should be marked as complete even
198   // before the end of the batching period - to allow the service to read it in
199   // full.
200   ASSERT_EQ(SharedMemoryABI::kChunkComplete,
201             arbiter_->shmem_abi_for_testing()->GetChunkState(2u, 0u));
202 
203   // Make sure that CommitData gets called once (should happen at the end
204   // of the batching period), with the two chunks in the batch.
205   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
206       .WillOnce(Invoke([](const CommitDataRequest& req,
207                           MockProducerEndpoint::CommitDataCallback) {
208         ASSERT_EQ(2, req.chunks_to_move_size());
209 
210         // Verify that this is the first chunk that we expect to have been
211         // batched.
212         ASSERT_EQ(1u, req.chunks_to_move()[0].page());
213         ASSERT_EQ(0u, req.chunks_to_move()[0].chunk());
214         ASSERT_EQ(1u, req.chunks_to_move()[0].target_buffer());
215 
216         // Verify that this is the second chunk that we expect to have been
217         // batched.
218         ASSERT_EQ(2u, req.chunks_to_move()[1].page());
219         ASSERT_EQ(0u, req.chunks_to_move()[1].chunk());
220         ASSERT_EQ(2u, req.chunks_to_move()[1].target_buffer());
221       }));
222 
223   // Pretend we've reached the end of the batching period.
224   arbiter_->FlushPendingCommitDataRequests();
225 }
226 
227 // Helper for verifying trace writer id allocations.
228 class TraceWriterIdChecker : public FakeProducerEndpoint {
229  public:
TraceWriterIdChecker(std::function<void ()> checkpoint)230   TraceWriterIdChecker(std::function<void()> checkpoint)
231       : checkpoint_(std::move(checkpoint)) {}
232 
RegisterTraceWriter(uint32_t id,uint32_t)233   void RegisterTraceWriter(uint32_t id, uint32_t) override {
234     EXPECT_GT(id, 0u);
235     EXPECT_LE(id, kMaxWriterID);
236     if (id > 0 && id <= kMaxWriterID) {
237       registered_ids_.set(id - 1);
238     }
239   }
240 
UnregisterTraceWriter(uint32_t id)241   void UnregisterTraceWriter(uint32_t id) override {
242     if (++unregister_calls_ == kMaxWriterID)
243       checkpoint_();
244 
245     EXPECT_GT(id, 0u);
246     EXPECT_LE(id, kMaxWriterID);
247     if (id > 0 && id <= kMaxWriterID) {
248       unregistered_ids_.set(id - 1);
249     }
250   }
251 
252   // bit N corresponds to id N+1
253   std::bitset<kMaxWriterID> registered_ids_;
254   std::bitset<kMaxWriterID> unregistered_ids_;
255 
256   int unregister_calls_ = 0;
257 
258  private:
259   std::function<void()> checkpoint_;
260 };
261 
262 // Check that we can actually create up to kMaxWriterID TraceWriter(s).
TEST_P(SharedMemoryArbiterImplTest,WriterIDsAllocation)263 TEST_P(SharedMemoryArbiterImplTest, WriterIDsAllocation) {
264   auto checkpoint = task_runner_->CreateCheckpoint("last_unregistered");
265 
266   TraceWriterIdChecker id_checking_endpoint(checkpoint);
267   arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
268                                              &id_checking_endpoint,
269                                              task_runner_.get()));
270   {
271     std::map<WriterID, std::unique_ptr<TraceWriter>> writers;
272 
273     for (size_t i = 0; i < kMaxWriterID; i++) {
274       std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(1);
275       ASSERT_TRUE(writer);
276       WriterID writer_id = writer->writer_id();
277       ASSERT_TRUE(writers.emplace(writer_id, std::move(writer)).second);
278     }
279 
280     // A further call should return a null impl of trace writer as we exhausted
281     // writer IDs.
282     ASSERT_EQ(arbiter_->CreateTraceWriter(1)->writer_id(), 0);
283   }
284 
285   // This should run the Register/UnregisterTraceWriter tasks enqueued by the
286   // memory arbiter.
287   task_runner_->RunUntilCheckpoint("last_unregistered", 15000);
288 
289   EXPECT_TRUE(id_checking_endpoint.registered_ids_.all());
290   EXPECT_TRUE(id_checking_endpoint.unregistered_ids_.all());
291 }
292 
TEST_P(SharedMemoryArbiterImplTest,Shutdown)293 TEST_P(SharedMemoryArbiterImplTest, Shutdown) {
294   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(1);
295   EXPECT_TRUE(writer);
296   EXPECT_FALSE(arbiter_->TryShutdown());
297 
298   // We still get a valid trace writer after shutdown, but it's a null one
299   // that's not connected to the arbiter.
300   std::unique_ptr<TraceWriter> writer2 = arbiter_->CreateTraceWriter(2);
301   EXPECT_TRUE(writer2);
302   EXPECT_EQ(writer2->writer_id(), 0);
303 
304   // Shutdown will succeed once the only non-null writer goes away.
305   writer.reset();
306   EXPECT_TRUE(arbiter_->TryShutdown());
307 }
308 
309 // Verify that getting a new chunk doesn't stall when kDrop policy is chosen.
TEST_P(SharedMemoryArbiterImplTest,BufferExhaustedPolicyDrop)310 TEST_P(SharedMemoryArbiterImplTest, BufferExhaustedPolicyDrop) {
311   // Grab all chunks in the SMB.
312   SharedMemoryArbiterImpl::set_default_layout_for_testing(
313       SharedMemoryABI::PageLayout::kPageDiv1);
314   static constexpr size_t kTotChunks = kNumPages;
315   SharedMemoryABI::Chunk chunks[kTotChunks];
316   for (size_t i = 0; i < kTotChunks; i++) {
317     chunks[i] = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
318     ASSERT_TRUE(chunks[i].is_valid());
319   }
320 
321   // SMB is exhausted, thus GetNewChunk() should return an invalid chunk. In
322   // kStall mode, this would stall.
323   SharedMemoryABI::Chunk invalid_chunk =
324       arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
325   ASSERT_FALSE(invalid_chunk.is_valid());
326 
327   // Returning the chunk is not enough to be able to reacquire it.
328   PatchList ignored;
329   arbiter_->ReturnCompletedChunk(std::move(chunks[0]), 1, &ignored);
330 
331   invalid_chunk = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
332   ASSERT_FALSE(invalid_chunk.is_valid());
333 
334   // After releasing the chunk as free, we can reacquire it.
335   chunks[0] =
336       arbiter_->shmem_abi_for_testing()->TryAcquireChunkForReading(0, 0);
337   ASSERT_TRUE(chunks[0].is_valid());
338   arbiter_->shmem_abi_for_testing()->ReleaseChunkAsFree(std::move(chunks[0]));
339 
340   chunks[0] = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
341   ASSERT_TRUE(chunks[0].is_valid());
342 }
343 
TEST_P(SharedMemoryArbiterImplTest,CreateUnboundAndBind)344 TEST_P(SharedMemoryArbiterImplTest, CreateUnboundAndBind) {
345   auto checkpoint_writer = task_runner_->CreateCheckpoint("writer_registered");
346   auto checkpoint_flush = task_runner_->CreateCheckpoint("flush_completed");
347 
348   // Create an unbound arbiter and bind immediately.
349   arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
350                                              nullptr, nullptr));
351   arbiter_->BindToProducerEndpoint(&mock_producer_endpoint_,
352                                    task_runner_.get());
353   EXPECT_TRUE(IsArbiterFullyBound());
354 
355   // Trace writer should be registered in a non-delayed task.
356   EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 1))
357       .WillOnce(testing::InvokeWithoutArgs(checkpoint_writer));
358   std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(1);
359   task_runner_->RunUntilCheckpoint("writer_registered", 5000);
360 
361   // Commits/flushes should be sent right away.
362   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
363       .WillOnce(testing::InvokeArgument<1>());
364   writer->Flush(checkpoint_flush);
365   task_runner_->RunUntilCheckpoint("flush_completed", 5000);
366 }
367 
TEST_P(SharedMemoryArbiterImplTest,StartupTracing)368 TEST_P(SharedMemoryArbiterImplTest, StartupTracing) {
369   constexpr uint16_t kTargetBufferReservationId1 = 1;
370   constexpr uint16_t kTargetBufferReservationId2 = 2;
371 
372   // Create an unbound arbiter and a startup writer.
373   arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
374                                              nullptr, nullptr));
375   std::unique_ptr<TraceWriter> writer =
376       arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
377 
378   // Write two packet while unbound and flush the chunk after each packet. The
379   // writer will return the chunk to the arbiter and grab a new chunk for the
380   // second packet. The flush should only add the chunk into the queued commit
381   // request.
382   for (int i = 0; i < 2; i++) {
383     {
384       auto packet = writer->NewTracePacket();
385       packet->set_for_testing()->set_str("foo");
386     }
387     writer->Flush();
388   }
389 
390   // Bind to producer endpoint. This should not register the trace writer yet,
391   // because it's buffer reservation is still unbound.
392   arbiter_->BindToProducerEndpoint(&mock_producer_endpoint_,
393                                    task_runner_.get());
394   EXPECT_FALSE(IsArbiterFullyBound());
395 
396   // Write another packet into another chunk and queue it.
397   {
398     auto packet = writer->NewTracePacket();
399     packet->set_for_testing()->set_str("foo");
400   }
401   bool flush_completed = false;
402   writer->Flush([&flush_completed] { flush_completed = true; });
403 
404   // Bind the buffer reservation to a buffer. Trace writer should be registered
405   // and queued commits flushed.
406   EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 42));
407   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
408       .WillOnce(Invoke([](const CommitDataRequest& req,
409                           MockProducerEndpoint::CommitDataCallback callback) {
410         ASSERT_EQ(3, req.chunks_to_move_size());
411         EXPECT_EQ(42u, req.chunks_to_move()[0].target_buffer());
412         EXPECT_EQ(42u, req.chunks_to_move()[1].target_buffer());
413         EXPECT_EQ(42u, req.chunks_to_move()[2].target_buffer());
414         callback();
415       }));
416 
417   arbiter_->BindStartupTargetBuffer(kTargetBufferReservationId1, 42);
418   EXPECT_TRUE(IsArbiterFullyBound());
419 
420   testing::Mock::VerifyAndClearExpectations(&mock_producer_endpoint_);
421   EXPECT_TRUE(flush_completed);
422 
423   // Creating a new startup writer for the same buffer posts an immediate task
424   // to register it.
425   auto checkpoint_register1b =
426       task_runner_->CreateCheckpoint("writer1b_registered");
427   EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 42))
428       .WillOnce(testing::InvokeWithoutArgs(checkpoint_register1b));
429   std::unique_ptr<TraceWriter> writer1b =
430       arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
431   task_runner_->RunUntilCheckpoint("writer1b_registered", 5000);
432 
433   // And a commit on this new writer should be flushed to the right buffer, too.
434   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
435       .WillOnce(Invoke([](const CommitDataRequest& req,
436                           MockProducerEndpoint::CommitDataCallback callback) {
437         ASSERT_EQ(1, req.chunks_to_move_size());
438         EXPECT_EQ(42u, req.chunks_to_move()[0].target_buffer());
439         callback();
440       }));
441   {
442     auto packet = writer1b->NewTracePacket();
443     packet->set_for_testing()->set_str("foo");
444   }
445   flush_completed = false;
446   writer1b->Flush([&flush_completed] { flush_completed = true; });
447 
448   testing::Mock::VerifyAndClearExpectations(&mock_producer_endpoint_);
449   EXPECT_TRUE(flush_completed);
450 
451   // Create another startup writer for another target buffer, which puts the
452   // arbiter back into unbound state.
453   std::unique_ptr<TraceWriter> writer2 =
454       arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId2);
455   EXPECT_FALSE(IsArbiterFullyBound());
456 
457   // Write a chunk into both writers. Both should be queued up into the next
458   // commit request.
459   {
460     auto packet = writer->NewTracePacket();
461     packet->set_for_testing()->set_str("foo");
462   }
463   writer->Flush();
464   {
465     auto packet = writer2->NewTracePacket();
466     packet->set_for_testing()->set_str("bar");
467   }
468   flush_completed = false;
469   writer2->Flush([&flush_completed] { flush_completed = true; });
470 
471   // Destroy the first trace writer, which should cause the arbiter to post a
472   // task to unregister it.
473   auto checkpoint_writer =
474       task_runner_->CreateCheckpoint("writer_unregistered");
475   EXPECT_CALL(mock_producer_endpoint_,
476               UnregisterTraceWriter(writer->writer_id()))
477       .WillOnce(testing::InvokeWithoutArgs(checkpoint_writer));
478   writer.reset();
479   task_runner_->RunUntilCheckpoint("writer_unregistered", 5000);
480 
481   // Bind the second buffer reservation to a buffer. Second trace writer should
482   // be registered and queued commits flushed.
483   EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 23));
484   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
485       .WillOnce(Invoke([](const CommitDataRequest& req,
486                           MockProducerEndpoint::CommitDataCallback callback) {
487         ASSERT_EQ(2, req.chunks_to_move_size());
488         EXPECT_EQ(42u, req.chunks_to_move()[0].target_buffer());
489         EXPECT_EQ(23u, req.chunks_to_move()[1].target_buffer());
490         callback();
491       }));
492 
493   arbiter_->BindStartupTargetBuffer(kTargetBufferReservationId2, 23);
494   EXPECT_TRUE(IsArbiterFullyBound());
495 
496   testing::Mock::VerifyAndClearExpectations(&mock_producer_endpoint_);
497   EXPECT_TRUE(flush_completed);
498 }
499 
TEST_P(SharedMemoryArbiterImplTest,AbortStartupTracingForReservation)500 TEST_P(SharedMemoryArbiterImplTest, AbortStartupTracingForReservation) {
501   constexpr uint16_t kTargetBufferReservationId1 = 1;
502   constexpr uint16_t kTargetBufferReservationId2 = 2;
503 
504   // Create an unbound arbiter and a startup writer.
505   arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
506                                              nullptr, nullptr));
507   SharedMemoryABI* shmem_abi = arbiter_->shmem_abi_for_testing();
508   std::unique_ptr<TraceWriter> writer =
509       arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
510   std::unique_ptr<TraceWriter> writer2 =
511       arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
512 
513   // Write two packet while unbound and flush the chunk after each packet. The
514   // writer will return the chunk to the arbiter and grab a new chunk for the
515   // second packet. The flush should only add the chunk into the queued commit
516   // request.
517   for (int i = 0; i < 2; i++) {
518     {
519       auto packet = writer->NewTracePacket();
520       packet->set_for_testing()->set_str("foo");
521     }
522     writer->Flush();
523   }
524 
525   // Abort the first session. This should clear resolve the two chunks committed
526   // up to this point to an invalid target buffer (ID 0). They will remain
527   // buffered until bound to an endpoint.
528   arbiter_->AbortStartupTracingForReservation(kTargetBufferReservationId1);
529 
530   // Destroy a writer that was created before the abort. This should not cause
531   // crashes.
532   writer2.reset();
533 
534   // Bind to producer endpoint. The trace writer should not be registered as its
535   // target buffer is invalid. Since no startup sessions are active anymore, the
536   // arbiter should be fully bound. The commit data request is flushed.
537   EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, _)).Times(0);
538   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
539       .WillOnce(Invoke([shmem_abi](const CommitDataRequest& req,
540                                    MockProducerEndpoint::CommitDataCallback) {
541         ASSERT_EQ(2, req.chunks_to_move_size());
542         for (size_t i = 0; i < 2; i++) {
543           EXPECT_EQ(0u, req.chunks_to_move()[i].target_buffer());
544           SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
545               req.chunks_to_move()[i].page(), req.chunks_to_move()[i].chunk());
546           shmem_abi->ReleaseChunkAsFree(std::move(chunk));
547         }
548       }));
549   arbiter_->BindToProducerEndpoint(&mock_producer_endpoint_,
550                                    task_runner_.get());
551   EXPECT_TRUE(IsArbiterFullyBound());
552 
553   // SMB should be free again, as no writer holds on to any chunk anymore.
554   for (size_t i = 0; i < shmem_abi->num_pages(); i++)
555     EXPECT_TRUE(shmem_abi->is_page_free(i));
556 
557   // Write another packet into another chunk and commit it. It should be sent
558   // to the arbiter with invalid target buffer (ID 0).
559   {
560     auto packet = writer->NewTracePacket();
561     packet->set_for_testing()->set_str("foo");
562   }
563   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
564       .WillOnce(Invoke([shmem_abi](
565                            const CommitDataRequest& req,
566                            MockProducerEndpoint::CommitDataCallback callback) {
567         ASSERT_EQ(1, req.chunks_to_move_size());
568         EXPECT_EQ(0u, req.chunks_to_move()[0].target_buffer());
569         SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
570             req.chunks_to_move()[0].page(), req.chunks_to_move()[0].chunk());
571         shmem_abi->ReleaseChunkAsFree(std::move(chunk));
572         callback();
573       }));
574   bool flush_completed = false;
575   writer->Flush([&flush_completed] { flush_completed = true; });
576   EXPECT_TRUE(flush_completed);
577 
578   // Creating a new startup writer for the same buffer does not cause it to
579   // register.
580   EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, _)).Times(0);
581   std::unique_ptr<TraceWriter> writer1b =
582       arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
583 
584   // And a commit on this new writer should again be flushed to the invalid
585   // target buffer.
586   {
587     auto packet = writer1b->NewTracePacket();
588     packet->set_for_testing()->set_str("foo");
589   }
590   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
591       .WillOnce(Invoke([shmem_abi](
592                            const CommitDataRequest& req,
593                            MockProducerEndpoint::CommitDataCallback callback) {
594         ASSERT_EQ(1, req.chunks_to_move_size());
595         EXPECT_EQ(0u, req.chunks_to_move()[0].target_buffer());
596         SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
597             req.chunks_to_move()[0].page(), req.chunks_to_move()[0].chunk());
598         shmem_abi->ReleaseChunkAsFree(std::move(chunk));
599         callback();
600       }));
601   flush_completed = false;
602   writer1b->Flush([&flush_completed] { flush_completed = true; });
603   EXPECT_TRUE(flush_completed);
604 
605   // Create another startup writer for another target buffer, which puts the
606   // arbiter back into unbound state.
607   std::unique_ptr<TraceWriter> writer3 =
608       arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId2);
609   EXPECT_FALSE(IsArbiterFullyBound());
610 
611   // Write a chunk into both writers. Both should be queued up into the next
612   // commit request.
613   {
614     auto packet = writer->NewTracePacket();
615     packet->set_for_testing()->set_str("foo");
616   }
617   writer->Flush();
618   {
619     auto packet = writer3->NewTracePacket();
620     packet->set_for_testing()->set_str("bar");
621   }
622   flush_completed = false;
623   writer3->Flush([&flush_completed] { flush_completed = true; });
624 
625   // Destroy the first trace writer, which should cause the arbiter to post a
626   // task to unregister it.
627   auto checkpoint_writer =
628       task_runner_->CreateCheckpoint("writer_unregistered");
629   EXPECT_CALL(mock_producer_endpoint_,
630               UnregisterTraceWriter(writer->writer_id()))
631       .WillOnce(testing::InvokeWithoutArgs(checkpoint_writer));
632   writer.reset();
633   task_runner_->RunUntilCheckpoint("writer_unregistered", 5000);
634 
635   // Abort the second session. Its commits should now also be associated with
636   // target buffer 0, and both writers' commits flushed.
637   EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, _)).Times(0);
638   EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
639       .WillOnce(Invoke([shmem_abi](
640                            const CommitDataRequest& req,
641                            MockProducerEndpoint::CommitDataCallback callback) {
642         ASSERT_EQ(2, req.chunks_to_move_size());
643         for (size_t i = 0; i < 2; i++) {
644           EXPECT_EQ(0u, req.chunks_to_move()[i].target_buffer());
645           SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
646               req.chunks_to_move()[i].page(), req.chunks_to_move()[i].chunk());
647           shmem_abi->ReleaseChunkAsFree(std::move(chunk));
648         }
649         callback();
650       }));
651 
652   arbiter_->AbortStartupTracingForReservation(kTargetBufferReservationId2);
653   EXPECT_TRUE(IsArbiterFullyBound());
654   EXPECT_TRUE(flush_completed);
655 
656   // SMB should be free again, as no writer holds on to any chunk anymore.
657   for (size_t i = 0; i < shmem_abi->num_pages(); i++)
658     EXPECT_TRUE(shmem_abi->is_page_free(i));
659 }
660 
661 // TODO(primiano): add multi-threaded tests.
662 
663 }  // namespace perfetto
664