1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
18
19 #include <bitset>
20 #include "perfetto/ext/base/utils.h"
21 #include "perfetto/ext/tracing/core/basic_types.h"
22 #include "perfetto/ext/tracing/core/commit_data_request.h"
23 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
24 #include "perfetto/ext/tracing/core/trace_packet.h"
25 #include "perfetto/ext/tracing/core/trace_writer.h"
26 #include "src/base/test/gtest_test_suite.h"
27 #include "src/base/test/test_task_runner.h"
28 #include "src/tracing/core/patch_list.h"
29 #include "src/tracing/test/aligned_buffer_test.h"
30 #include "src/tracing/test/fake_producer_endpoint.h"
31 #include "test/gtest_and_gmock.h"
32
33 #include "protos/perfetto/trace/test_event.pbzero.h"
34 #include "protos/perfetto/trace/trace_packet.pbzero.h"
35
36 namespace perfetto {
37
38 using testing::_;
39 using testing::Invoke;
40 using testing::Mock;
41
42 class MockProducerEndpoint : public TracingService::ProducerEndpoint {
43 public:
RegisterDataSource(const DataSourceDescriptor &)44 void RegisterDataSource(const DataSourceDescriptor&) override {}
UnregisterDataSource(const std::string &)45 void UnregisterDataSource(const std::string&) override {}
NotifyFlushComplete(FlushRequestID)46 void NotifyFlushComplete(FlushRequestID) override {}
NotifyDataSourceStarted(DataSourceInstanceID)47 void NotifyDataSourceStarted(DataSourceInstanceID) override {}
NotifyDataSourceStopped(DataSourceInstanceID)48 void NotifyDataSourceStopped(DataSourceInstanceID) override {}
ActivateTriggers(const std::vector<std::string> &)49 void ActivateTriggers(const std::vector<std::string>&) {}
Sync(std::function<void ()>)50 void Sync(std::function<void()>) override {}
shared_memory() const51 SharedMemory* shared_memory() const override { return nullptr; }
shared_buffer_page_size_kb() const52 size_t shared_buffer_page_size_kb() const override { return 0; }
CreateTraceWriter(BufferID,BufferExhaustedPolicy)53 std::unique_ptr<TraceWriter> CreateTraceWriter(
54 BufferID,
55 BufferExhaustedPolicy) override {
56 return nullptr;
57 }
MaybeSharedMemoryArbiter()58 SharedMemoryArbiter* MaybeSharedMemoryArbiter() override { return nullptr; }
IsShmemProvidedByProducer() const59 bool IsShmemProvidedByProducer() const override { return false; }
60
61 MOCK_METHOD2(CommitData, void(const CommitDataRequest&, CommitDataCallback));
62 MOCK_METHOD2(RegisterTraceWriter, void(uint32_t, uint32_t));
63 MOCK_METHOD1(UnregisterTraceWriter, void(uint32_t));
64 };
65
66 class SharedMemoryArbiterImplTest : public AlignedBufferTest {
67 public:
SetUp()68 void SetUp() override {
69 AlignedBufferTest::SetUp();
70 task_runner_.reset(new base::TestTaskRunner());
71 arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
72 &mock_producer_endpoint_,
73 task_runner_.get()));
74 }
75
IsArbiterFullyBound()76 bool IsArbiterFullyBound() { return arbiter_->fully_bound_; }
77
TearDown()78 void TearDown() override {
79 arbiter_.reset();
80 task_runner_.reset();
81 }
82
83 std::unique_ptr<base::TestTaskRunner> task_runner_;
84 std::unique_ptr<SharedMemoryArbiterImpl> arbiter_;
85 MockProducerEndpoint mock_producer_endpoint_;
86 std::function<void(const std::vector<uint32_t>&)> on_pages_complete_;
87 };
88
89 size_t const kPageSizes[] = {4096, 65536};
90 INSTANTIATE_TEST_SUITE_P(PageSize,
91 SharedMemoryArbiterImplTest,
92 ::testing::ValuesIn(kPageSizes));
93
94 // The buffer has 14 pages (kNumPages), each will be partitioned in 14 chunks.
95 // The test requests 30 chunks (2 full pages + 2 chunks from a 3rd page) and
96 // releases them in different batches. It tests the consistency of the batches
97 // and the releasing order.
TEST_P(SharedMemoryArbiterImplTest,GetAndReturnChunks)98 TEST_P(SharedMemoryArbiterImplTest, GetAndReturnChunks) {
99 SharedMemoryArbiterImpl::set_default_layout_for_testing(
100 SharedMemoryABI::PageLayout::kPageDiv14);
101 static constexpr size_t kTotChunks = kNumPages * 14;
102 SharedMemoryABI::Chunk chunks[kTotChunks];
103 for (size_t i = 0; i < 14 * 2 + 2; i++) {
104 chunks[i] = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kStall);
105 ASSERT_TRUE(chunks[i].is_valid());
106 }
107
108 // Finally return the first 28 chunks (full 2 pages) and only the 2nd chunk of
109 // the 2rd page. Chunks are release in interleaved order: 1,0,3,2,5,4,7,6.
110 // Check that the notification callback is posted and order is consistent.
111 auto on_commit_1 = task_runner_->CreateCheckpoint("on_commit_1");
112 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
113 .WillOnce(Invoke([on_commit_1](const CommitDataRequest& req,
114 MockProducerEndpoint::CommitDataCallback) {
115 ASSERT_EQ(14 * 2 + 1, req.chunks_to_move_size());
116 for (size_t i = 0; i < 14 * 2; i++) {
117 ASSERT_EQ(i / 14, req.chunks_to_move()[i].page());
118 ASSERT_EQ((i % 14) ^ 1, req.chunks_to_move()[i].chunk());
119 ASSERT_EQ(i % 5 + 1, req.chunks_to_move()[i].target_buffer());
120 }
121 ASSERT_EQ(2u, req.chunks_to_move()[28].page());
122 ASSERT_EQ(1u, req.chunks_to_move()[28].chunk());
123 ASSERT_EQ(42u, req.chunks_to_move()[28].target_buffer());
124 on_commit_1();
125 }));
126 PatchList ignored;
127 for (size_t i = 0; i < 14 * 2; i++) {
128 arbiter_->ReturnCompletedChunk(std::move(chunks[i ^ 1]), i % 5 + 1,
129 &ignored);
130 }
131 arbiter_->ReturnCompletedChunk(std::move(chunks[29]), 42, &ignored);
132 task_runner_->RunUntilCheckpoint("on_commit_1");
133
134 // Then release the 1st chunk of the 3rd page, and check that we get a
135 // notification for that as well.
136 auto on_commit_2 = task_runner_->CreateCheckpoint("on_commit_2");
137 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
138 .WillOnce(Invoke([on_commit_2](const CommitDataRequest& req,
139 MockProducerEndpoint::CommitDataCallback) {
140 ASSERT_EQ(1, req.chunks_to_move_size());
141 ASSERT_EQ(2u, req.chunks_to_move()[0].page());
142 ASSERT_EQ(0u, req.chunks_to_move()[0].chunk());
143 ASSERT_EQ(43u, req.chunks_to_move()[0].target_buffer());
144 on_commit_2();
145 }));
146 arbiter_->ReturnCompletedChunk(std::move(chunks[28]), 43, &ignored);
147 task_runner_->RunUntilCheckpoint("on_commit_2");
148 }
149
TEST_P(SharedMemoryArbiterImplTest,BatchCommits)150 TEST_P(SharedMemoryArbiterImplTest, BatchCommits) {
151 SharedMemoryArbiterImpl::set_default_layout_for_testing(
152 SharedMemoryABI::PageLayout::kPageDiv1);
153
154 // Batching period is 0s - chunks are being committed as soon as they are
155 // returned.
156 SharedMemoryABI::Chunk chunk =
157 arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDefault);
158 ASSERT_TRUE(chunk.is_valid());
159 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _)).Times(1);
160 PatchList ignored;
161 arbiter_->ReturnCompletedChunk(std::move(chunk), 0, &ignored);
162 task_runner_->RunUntilIdle();
163 ASSERT_TRUE(Mock::VerifyAndClearExpectations(&mock_producer_endpoint_));
164
165 // Since we cannot explicitly control the passage of time in task_runner_, to
166 // simulate a non-zero batching period and a commit at the end of it, set the
167 // batching duration to a very large value and call
168 // FlushPendingCommitDataRequests to manually trigger the commit.
169 arbiter_->SetDirectSMBPatchingSupportedByService();
170 ASSERT_TRUE(arbiter_->EnableDirectSMBPatching());
171 arbiter_->SetBatchCommitsDuration(UINT32_MAX);
172
173 // First chunk that will be batched. CommitData should not be called
174 // immediately this time.
175 chunk = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDefault);
176 ASSERT_TRUE(chunk.is_valid());
177 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _)).Times(0);
178 // We'll pretend that the chunk needs patching. This is done in order to
179 // verify that chunks that need patching are not marked as complete (i.e. they
180 // are kept in state kChunkBeingWritten) before the batching period ends - in
181 // case a patch for them arrives during the batching period.
182 chunk.SetFlag(SharedMemoryABI::ChunkHeader::kChunkNeedsPatching);
183 arbiter_->ReturnCompletedChunk(std::move(chunk), 1, &ignored);
184 task_runner_->RunUntilIdle();
185 ASSERT_TRUE(Mock::VerifyAndClearExpectations(&mock_producer_endpoint_));
186 ASSERT_EQ(SharedMemoryABI::kChunkBeingWritten,
187 arbiter_->shmem_abi_for_testing()->GetChunkState(1u, 0u));
188
189 // Add a second chunk to the batch. This should also not trigger an immediate
190 // call to CommitData.
191 chunk = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDefault);
192 ASSERT_TRUE(chunk.is_valid());
193 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _)).Times(0);
194 arbiter_->ReturnCompletedChunk(std::move(chunk), 2, &ignored);
195 task_runner_->RunUntilIdle();
196 ASSERT_TRUE(Mock::VerifyAndClearExpectations(&mock_producer_endpoint_));
197 // This chunk does not need patching, so it should be marked as complete even
198 // before the end of the batching period - to allow the service to read it in
199 // full.
200 ASSERT_EQ(SharedMemoryABI::kChunkComplete,
201 arbiter_->shmem_abi_for_testing()->GetChunkState(2u, 0u));
202
203 // Make sure that CommitData gets called once (should happen at the end
204 // of the batching period), with the two chunks in the batch.
205 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
206 .WillOnce(Invoke([](const CommitDataRequest& req,
207 MockProducerEndpoint::CommitDataCallback) {
208 ASSERT_EQ(2, req.chunks_to_move_size());
209
210 // Verify that this is the first chunk that we expect to have been
211 // batched.
212 ASSERT_EQ(1u, req.chunks_to_move()[0].page());
213 ASSERT_EQ(0u, req.chunks_to_move()[0].chunk());
214 ASSERT_EQ(1u, req.chunks_to_move()[0].target_buffer());
215
216 // Verify that this is the second chunk that we expect to have been
217 // batched.
218 ASSERT_EQ(2u, req.chunks_to_move()[1].page());
219 ASSERT_EQ(0u, req.chunks_to_move()[1].chunk());
220 ASSERT_EQ(2u, req.chunks_to_move()[1].target_buffer());
221 }));
222
223 // Pretend we've reached the end of the batching period.
224 arbiter_->FlushPendingCommitDataRequests();
225 }
226
227 // Helper for verifying trace writer id allocations.
228 class TraceWriterIdChecker : public FakeProducerEndpoint {
229 public:
TraceWriterIdChecker(std::function<void ()> checkpoint)230 TraceWriterIdChecker(std::function<void()> checkpoint)
231 : checkpoint_(std::move(checkpoint)) {}
232
RegisterTraceWriter(uint32_t id,uint32_t)233 void RegisterTraceWriter(uint32_t id, uint32_t) override {
234 EXPECT_GT(id, 0u);
235 EXPECT_LE(id, kMaxWriterID);
236 if (id > 0 && id <= kMaxWriterID) {
237 registered_ids_.set(id - 1);
238 }
239 }
240
UnregisterTraceWriter(uint32_t id)241 void UnregisterTraceWriter(uint32_t id) override {
242 if (++unregister_calls_ == kMaxWriterID)
243 checkpoint_();
244
245 EXPECT_GT(id, 0u);
246 EXPECT_LE(id, kMaxWriterID);
247 if (id > 0 && id <= kMaxWriterID) {
248 unregistered_ids_.set(id - 1);
249 }
250 }
251
252 // bit N corresponds to id N+1
253 std::bitset<kMaxWriterID> registered_ids_;
254 std::bitset<kMaxWriterID> unregistered_ids_;
255
256 int unregister_calls_ = 0;
257
258 private:
259 std::function<void()> checkpoint_;
260 };
261
262 // Check that we can actually create up to kMaxWriterID TraceWriter(s).
TEST_P(SharedMemoryArbiterImplTest,WriterIDsAllocation)263 TEST_P(SharedMemoryArbiterImplTest, WriterIDsAllocation) {
264 auto checkpoint = task_runner_->CreateCheckpoint("last_unregistered");
265
266 TraceWriterIdChecker id_checking_endpoint(checkpoint);
267 arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
268 &id_checking_endpoint,
269 task_runner_.get()));
270 {
271 std::map<WriterID, std::unique_ptr<TraceWriter>> writers;
272
273 for (size_t i = 0; i < kMaxWriterID; i++) {
274 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(1);
275 ASSERT_TRUE(writer);
276 WriterID writer_id = writer->writer_id();
277 ASSERT_TRUE(writers.emplace(writer_id, std::move(writer)).second);
278 }
279
280 // A further call should return a null impl of trace writer as we exhausted
281 // writer IDs.
282 ASSERT_EQ(arbiter_->CreateTraceWriter(1)->writer_id(), 0);
283 }
284
285 // This should run the Register/UnregisterTraceWriter tasks enqueued by the
286 // memory arbiter.
287 task_runner_->RunUntilCheckpoint("last_unregistered", 15000);
288
289 EXPECT_TRUE(id_checking_endpoint.registered_ids_.all());
290 EXPECT_TRUE(id_checking_endpoint.unregistered_ids_.all());
291 }
292
TEST_P(SharedMemoryArbiterImplTest,Shutdown)293 TEST_P(SharedMemoryArbiterImplTest, Shutdown) {
294 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(1);
295 EXPECT_TRUE(writer);
296 EXPECT_FALSE(arbiter_->TryShutdown());
297
298 // We still get a valid trace writer after shutdown, but it's a null one
299 // that's not connected to the arbiter.
300 std::unique_ptr<TraceWriter> writer2 = arbiter_->CreateTraceWriter(2);
301 EXPECT_TRUE(writer2);
302 EXPECT_EQ(writer2->writer_id(), 0);
303
304 // Shutdown will succeed once the only non-null writer goes away.
305 writer.reset();
306 EXPECT_TRUE(arbiter_->TryShutdown());
307 }
308
309 // Verify that getting a new chunk doesn't stall when kDrop policy is chosen.
TEST_P(SharedMemoryArbiterImplTest,BufferExhaustedPolicyDrop)310 TEST_P(SharedMemoryArbiterImplTest, BufferExhaustedPolicyDrop) {
311 // Grab all chunks in the SMB.
312 SharedMemoryArbiterImpl::set_default_layout_for_testing(
313 SharedMemoryABI::PageLayout::kPageDiv1);
314 static constexpr size_t kTotChunks = kNumPages;
315 SharedMemoryABI::Chunk chunks[kTotChunks];
316 for (size_t i = 0; i < kTotChunks; i++) {
317 chunks[i] = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
318 ASSERT_TRUE(chunks[i].is_valid());
319 }
320
321 // SMB is exhausted, thus GetNewChunk() should return an invalid chunk. In
322 // kStall mode, this would stall.
323 SharedMemoryABI::Chunk invalid_chunk =
324 arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
325 ASSERT_FALSE(invalid_chunk.is_valid());
326
327 // Returning the chunk is not enough to be able to reacquire it.
328 PatchList ignored;
329 arbiter_->ReturnCompletedChunk(std::move(chunks[0]), 1, &ignored);
330
331 invalid_chunk = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
332 ASSERT_FALSE(invalid_chunk.is_valid());
333
334 // After releasing the chunk as free, we can reacquire it.
335 chunks[0] =
336 arbiter_->shmem_abi_for_testing()->TryAcquireChunkForReading(0, 0);
337 ASSERT_TRUE(chunks[0].is_valid());
338 arbiter_->shmem_abi_for_testing()->ReleaseChunkAsFree(std::move(chunks[0]));
339
340 chunks[0] = arbiter_->GetNewChunk({}, BufferExhaustedPolicy::kDrop);
341 ASSERT_TRUE(chunks[0].is_valid());
342 }
343
TEST_P(SharedMemoryArbiterImplTest,CreateUnboundAndBind)344 TEST_P(SharedMemoryArbiterImplTest, CreateUnboundAndBind) {
345 auto checkpoint_writer = task_runner_->CreateCheckpoint("writer_registered");
346 auto checkpoint_flush = task_runner_->CreateCheckpoint("flush_completed");
347
348 // Create an unbound arbiter and bind immediately.
349 arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
350 nullptr, nullptr));
351 arbiter_->BindToProducerEndpoint(&mock_producer_endpoint_,
352 task_runner_.get());
353 EXPECT_TRUE(IsArbiterFullyBound());
354
355 // Trace writer should be registered in a non-delayed task.
356 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 1))
357 .WillOnce(testing::InvokeWithoutArgs(checkpoint_writer));
358 std::unique_ptr<TraceWriter> writer = arbiter_->CreateTraceWriter(1);
359 task_runner_->RunUntilCheckpoint("writer_registered", 5000);
360
361 // Commits/flushes should be sent right away.
362 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
363 .WillOnce(testing::InvokeArgument<1>());
364 writer->Flush(checkpoint_flush);
365 task_runner_->RunUntilCheckpoint("flush_completed", 5000);
366 }
367
TEST_P(SharedMemoryArbiterImplTest,StartupTracing)368 TEST_P(SharedMemoryArbiterImplTest, StartupTracing) {
369 constexpr uint16_t kTargetBufferReservationId1 = 1;
370 constexpr uint16_t kTargetBufferReservationId2 = 2;
371
372 // Create an unbound arbiter and a startup writer.
373 arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
374 nullptr, nullptr));
375 std::unique_ptr<TraceWriter> writer =
376 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
377
378 // Write two packet while unbound and flush the chunk after each packet. The
379 // writer will return the chunk to the arbiter and grab a new chunk for the
380 // second packet. The flush should only add the chunk into the queued commit
381 // request.
382 for (int i = 0; i < 2; i++) {
383 {
384 auto packet = writer->NewTracePacket();
385 packet->set_for_testing()->set_str("foo");
386 }
387 writer->Flush();
388 }
389
390 // Bind to producer endpoint. This should not register the trace writer yet,
391 // because it's buffer reservation is still unbound.
392 arbiter_->BindToProducerEndpoint(&mock_producer_endpoint_,
393 task_runner_.get());
394 EXPECT_FALSE(IsArbiterFullyBound());
395
396 // Write another packet into another chunk and queue it.
397 {
398 auto packet = writer->NewTracePacket();
399 packet->set_for_testing()->set_str("foo");
400 }
401 bool flush_completed = false;
402 writer->Flush([&flush_completed] { flush_completed = true; });
403
404 // Bind the buffer reservation to a buffer. Trace writer should be registered
405 // and queued commits flushed.
406 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 42));
407 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
408 .WillOnce(Invoke([](const CommitDataRequest& req,
409 MockProducerEndpoint::CommitDataCallback callback) {
410 ASSERT_EQ(3, req.chunks_to_move_size());
411 EXPECT_EQ(42u, req.chunks_to_move()[0].target_buffer());
412 EXPECT_EQ(42u, req.chunks_to_move()[1].target_buffer());
413 EXPECT_EQ(42u, req.chunks_to_move()[2].target_buffer());
414 callback();
415 }));
416
417 arbiter_->BindStartupTargetBuffer(kTargetBufferReservationId1, 42);
418 EXPECT_TRUE(IsArbiterFullyBound());
419
420 testing::Mock::VerifyAndClearExpectations(&mock_producer_endpoint_);
421 EXPECT_TRUE(flush_completed);
422
423 // Creating a new startup writer for the same buffer posts an immediate task
424 // to register it.
425 auto checkpoint_register1b =
426 task_runner_->CreateCheckpoint("writer1b_registered");
427 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 42))
428 .WillOnce(testing::InvokeWithoutArgs(checkpoint_register1b));
429 std::unique_ptr<TraceWriter> writer1b =
430 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
431 task_runner_->RunUntilCheckpoint("writer1b_registered", 5000);
432
433 // And a commit on this new writer should be flushed to the right buffer, too.
434 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
435 .WillOnce(Invoke([](const CommitDataRequest& req,
436 MockProducerEndpoint::CommitDataCallback callback) {
437 ASSERT_EQ(1, req.chunks_to_move_size());
438 EXPECT_EQ(42u, req.chunks_to_move()[0].target_buffer());
439 callback();
440 }));
441 {
442 auto packet = writer1b->NewTracePacket();
443 packet->set_for_testing()->set_str("foo");
444 }
445 flush_completed = false;
446 writer1b->Flush([&flush_completed] { flush_completed = true; });
447
448 testing::Mock::VerifyAndClearExpectations(&mock_producer_endpoint_);
449 EXPECT_TRUE(flush_completed);
450
451 // Create another startup writer for another target buffer, which puts the
452 // arbiter back into unbound state.
453 std::unique_ptr<TraceWriter> writer2 =
454 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId2);
455 EXPECT_FALSE(IsArbiterFullyBound());
456
457 // Write a chunk into both writers. Both should be queued up into the next
458 // commit request.
459 {
460 auto packet = writer->NewTracePacket();
461 packet->set_for_testing()->set_str("foo");
462 }
463 writer->Flush();
464 {
465 auto packet = writer2->NewTracePacket();
466 packet->set_for_testing()->set_str("bar");
467 }
468 flush_completed = false;
469 writer2->Flush([&flush_completed] { flush_completed = true; });
470
471 // Destroy the first trace writer, which should cause the arbiter to post a
472 // task to unregister it.
473 auto checkpoint_writer =
474 task_runner_->CreateCheckpoint("writer_unregistered");
475 EXPECT_CALL(mock_producer_endpoint_,
476 UnregisterTraceWriter(writer->writer_id()))
477 .WillOnce(testing::InvokeWithoutArgs(checkpoint_writer));
478 writer.reset();
479 task_runner_->RunUntilCheckpoint("writer_unregistered", 5000);
480
481 // Bind the second buffer reservation to a buffer. Second trace writer should
482 // be registered and queued commits flushed.
483 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, 23));
484 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
485 .WillOnce(Invoke([](const CommitDataRequest& req,
486 MockProducerEndpoint::CommitDataCallback callback) {
487 ASSERT_EQ(2, req.chunks_to_move_size());
488 EXPECT_EQ(42u, req.chunks_to_move()[0].target_buffer());
489 EXPECT_EQ(23u, req.chunks_to_move()[1].target_buffer());
490 callback();
491 }));
492
493 arbiter_->BindStartupTargetBuffer(kTargetBufferReservationId2, 23);
494 EXPECT_TRUE(IsArbiterFullyBound());
495
496 testing::Mock::VerifyAndClearExpectations(&mock_producer_endpoint_);
497 EXPECT_TRUE(flush_completed);
498 }
499
TEST_P(SharedMemoryArbiterImplTest,AbortStartupTracingForReservation)500 TEST_P(SharedMemoryArbiterImplTest, AbortStartupTracingForReservation) {
501 constexpr uint16_t kTargetBufferReservationId1 = 1;
502 constexpr uint16_t kTargetBufferReservationId2 = 2;
503
504 // Create an unbound arbiter and a startup writer.
505 arbiter_.reset(new SharedMemoryArbiterImpl(buf(), buf_size(), page_size(),
506 nullptr, nullptr));
507 SharedMemoryABI* shmem_abi = arbiter_->shmem_abi_for_testing();
508 std::unique_ptr<TraceWriter> writer =
509 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
510 std::unique_ptr<TraceWriter> writer2 =
511 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
512
513 // Write two packet while unbound and flush the chunk after each packet. The
514 // writer will return the chunk to the arbiter and grab a new chunk for the
515 // second packet. The flush should only add the chunk into the queued commit
516 // request.
517 for (int i = 0; i < 2; i++) {
518 {
519 auto packet = writer->NewTracePacket();
520 packet->set_for_testing()->set_str("foo");
521 }
522 writer->Flush();
523 }
524
525 // Abort the first session. This should clear resolve the two chunks committed
526 // up to this point to an invalid target buffer (ID 0). They will remain
527 // buffered until bound to an endpoint.
528 arbiter_->AbortStartupTracingForReservation(kTargetBufferReservationId1);
529
530 // Destroy a writer that was created before the abort. This should not cause
531 // crashes.
532 writer2.reset();
533
534 // Bind to producer endpoint. The trace writer should not be registered as its
535 // target buffer is invalid. Since no startup sessions are active anymore, the
536 // arbiter should be fully bound. The commit data request is flushed.
537 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, _)).Times(0);
538 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
539 .WillOnce(Invoke([shmem_abi](const CommitDataRequest& req,
540 MockProducerEndpoint::CommitDataCallback) {
541 ASSERT_EQ(2, req.chunks_to_move_size());
542 for (size_t i = 0; i < 2; i++) {
543 EXPECT_EQ(0u, req.chunks_to_move()[i].target_buffer());
544 SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
545 req.chunks_to_move()[i].page(), req.chunks_to_move()[i].chunk());
546 shmem_abi->ReleaseChunkAsFree(std::move(chunk));
547 }
548 }));
549 arbiter_->BindToProducerEndpoint(&mock_producer_endpoint_,
550 task_runner_.get());
551 EXPECT_TRUE(IsArbiterFullyBound());
552
553 // SMB should be free again, as no writer holds on to any chunk anymore.
554 for (size_t i = 0; i < shmem_abi->num_pages(); i++)
555 EXPECT_TRUE(shmem_abi->is_page_free(i));
556
557 // Write another packet into another chunk and commit it. It should be sent
558 // to the arbiter with invalid target buffer (ID 0).
559 {
560 auto packet = writer->NewTracePacket();
561 packet->set_for_testing()->set_str("foo");
562 }
563 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
564 .WillOnce(Invoke([shmem_abi](
565 const CommitDataRequest& req,
566 MockProducerEndpoint::CommitDataCallback callback) {
567 ASSERT_EQ(1, req.chunks_to_move_size());
568 EXPECT_EQ(0u, req.chunks_to_move()[0].target_buffer());
569 SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
570 req.chunks_to_move()[0].page(), req.chunks_to_move()[0].chunk());
571 shmem_abi->ReleaseChunkAsFree(std::move(chunk));
572 callback();
573 }));
574 bool flush_completed = false;
575 writer->Flush([&flush_completed] { flush_completed = true; });
576 EXPECT_TRUE(flush_completed);
577
578 // Creating a new startup writer for the same buffer does not cause it to
579 // register.
580 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, _)).Times(0);
581 std::unique_ptr<TraceWriter> writer1b =
582 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId1);
583
584 // And a commit on this new writer should again be flushed to the invalid
585 // target buffer.
586 {
587 auto packet = writer1b->NewTracePacket();
588 packet->set_for_testing()->set_str("foo");
589 }
590 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
591 .WillOnce(Invoke([shmem_abi](
592 const CommitDataRequest& req,
593 MockProducerEndpoint::CommitDataCallback callback) {
594 ASSERT_EQ(1, req.chunks_to_move_size());
595 EXPECT_EQ(0u, req.chunks_to_move()[0].target_buffer());
596 SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
597 req.chunks_to_move()[0].page(), req.chunks_to_move()[0].chunk());
598 shmem_abi->ReleaseChunkAsFree(std::move(chunk));
599 callback();
600 }));
601 flush_completed = false;
602 writer1b->Flush([&flush_completed] { flush_completed = true; });
603 EXPECT_TRUE(flush_completed);
604
605 // Create another startup writer for another target buffer, which puts the
606 // arbiter back into unbound state.
607 std::unique_ptr<TraceWriter> writer3 =
608 arbiter_->CreateStartupTraceWriter(kTargetBufferReservationId2);
609 EXPECT_FALSE(IsArbiterFullyBound());
610
611 // Write a chunk into both writers. Both should be queued up into the next
612 // commit request.
613 {
614 auto packet = writer->NewTracePacket();
615 packet->set_for_testing()->set_str("foo");
616 }
617 writer->Flush();
618 {
619 auto packet = writer3->NewTracePacket();
620 packet->set_for_testing()->set_str("bar");
621 }
622 flush_completed = false;
623 writer3->Flush([&flush_completed] { flush_completed = true; });
624
625 // Destroy the first trace writer, which should cause the arbiter to post a
626 // task to unregister it.
627 auto checkpoint_writer =
628 task_runner_->CreateCheckpoint("writer_unregistered");
629 EXPECT_CALL(mock_producer_endpoint_,
630 UnregisterTraceWriter(writer->writer_id()))
631 .WillOnce(testing::InvokeWithoutArgs(checkpoint_writer));
632 writer.reset();
633 task_runner_->RunUntilCheckpoint("writer_unregistered", 5000);
634
635 // Abort the second session. Its commits should now also be associated with
636 // target buffer 0, and both writers' commits flushed.
637 EXPECT_CALL(mock_producer_endpoint_, RegisterTraceWriter(_, _)).Times(0);
638 EXPECT_CALL(mock_producer_endpoint_, CommitData(_, _))
639 .WillOnce(Invoke([shmem_abi](
640 const CommitDataRequest& req,
641 MockProducerEndpoint::CommitDataCallback callback) {
642 ASSERT_EQ(2, req.chunks_to_move_size());
643 for (size_t i = 0; i < 2; i++) {
644 EXPECT_EQ(0u, req.chunks_to_move()[i].target_buffer());
645 SharedMemoryABI::Chunk chunk = shmem_abi->TryAcquireChunkForReading(
646 req.chunks_to_move()[i].page(), req.chunks_to_move()[i].chunk());
647 shmem_abi->ReleaseChunkAsFree(std::move(chunk));
648 }
649 callback();
650 }));
651
652 arbiter_->AbortStartupTracingForReservation(kTargetBufferReservationId2);
653 EXPECT_TRUE(IsArbiterFullyBound());
654 EXPECT_TRUE(flush_completed);
655
656 // SMB should be free again, as no writer holds on to any chunk anymore.
657 for (size_t i = 0; i < shmem_abi->num_pages(); i++)
658 EXPECT_TRUE(shmem_abi->is_page_free(i));
659 }
660
661 // TODO(primiano): add multi-threaded tests.
662
663 } // namespace perfetto
664