1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <inttypes.h>
18
19 #include "perfetto/ext/base/temp_file.h"
20 #include "perfetto/ext/tracing/core/consumer.h"
21 #include "perfetto/ext/tracing/core/producer.h"
22 #include "perfetto/ext/tracing/core/trace_packet.h"
23 #include "perfetto/ext/tracing/core/trace_stats.h"
24 #include "perfetto/ext/tracing/core/trace_writer.h"
25 #include "perfetto/ext/tracing/ipc/consumer_ipc_client.h"
26 #include "perfetto/ext/tracing/ipc/producer_ipc_client.h"
27 #include "perfetto/ext/tracing/ipc/service_ipc_host.h"
28 #include "perfetto/tracing/core/data_source_config.h"
29 #include "perfetto/tracing/core/data_source_descriptor.h"
30 #include "perfetto/tracing/core/trace_config.h"
31 #include "src/base/test/test_task_runner.h"
32 #include "src/ipc/test/test_socket.h"
33 #include "src/tracing/core/tracing_service_impl.h"
34 #include "test/gtest_and_gmock.h"
35
36 #include "protos/perfetto/config/trace_config.gen.h"
37 #include "protos/perfetto/trace/clock_snapshot.gen.h"
38 #include "protos/perfetto/trace/test_event.gen.h"
39 #include "protos/perfetto/trace/test_event.pbzero.h"
40 #include "protos/perfetto/trace/trace.gen.h"
41 #include "protos/perfetto/trace/trace_packet.gen.h"
42 #include "protos/perfetto/trace/trace_packet.pbzero.h"
43
44 namespace perfetto {
45 namespace {
46
47 using testing::_;
48 using testing::Invoke;
49 using testing::InvokeWithoutArgs;
50
51 ipc::TestSocket kProducerSock{"tracing_test-producer"};
52 ipc::TestSocket kConsumerSock{"tracing_test-consumer"};
53
54 // TODO(rsavitski): consider using src/tracing/test/mock_producer.h.
55 class MockProducer : public Producer {
56 public:
~MockProducer()57 ~MockProducer() override {}
58
59 // Producer implementation.
60 MOCK_METHOD0(OnConnect, void());
61 MOCK_METHOD0(OnDisconnect, void());
62 MOCK_METHOD2(SetupDataSource,
63 void(DataSourceInstanceID, const DataSourceConfig&));
64 MOCK_METHOD2(StartDataSource,
65 void(DataSourceInstanceID, const DataSourceConfig&));
66 MOCK_METHOD1(StopDataSource, void(DataSourceInstanceID));
67 MOCK_METHOD0(uid, uid_t());
68 MOCK_METHOD0(OnTracingSetup, void());
69 MOCK_METHOD3(Flush,
70 void(FlushRequestID, const DataSourceInstanceID*, size_t));
71 MOCK_METHOD2(ClearIncrementalState,
72 void(const DataSourceInstanceID*, size_t));
73 };
74
75 class MockConsumer : public Consumer {
76 public:
~MockConsumer()77 ~MockConsumer() override {}
78
79 // Producer implementation.
80 MOCK_METHOD0(OnConnect, void());
81 MOCK_METHOD0(OnDisconnect, void());
82 MOCK_METHOD1(OnTracingDisabled, void(const std::string& /*error*/));
83 MOCK_METHOD2(OnTracePackets, void(std::vector<TracePacket>*, bool));
84 MOCK_METHOD1(OnDetach, void(bool));
85 MOCK_METHOD2(OnAttach, void(bool, const TraceConfig&));
86 MOCK_METHOD2(OnTraceStats, void(bool, const TraceStats&));
87 MOCK_METHOD1(OnObservableEvents, void(const ObservableEvents&));
88
89 // Workaround, gmock doesn't support yet move-only types, passing a pointer.
OnTraceData(std::vector<TracePacket> packets,bool has_more)90 void OnTraceData(std::vector<TracePacket> packets, bool has_more) {
91 OnTracePackets(&packets, has_more);
92 }
93 };
94
CheckTraceStats(const protos::gen::TracePacket & packet)95 void CheckTraceStats(const protos::gen::TracePacket& packet) {
96 EXPECT_TRUE(packet.has_trace_stats());
97 EXPECT_GE(packet.trace_stats().producers_seen(), 1u);
98 EXPECT_EQ(1u, packet.trace_stats().producers_connected());
99 EXPECT_EQ(1u, packet.trace_stats().data_sources_registered());
100 EXPECT_EQ(1u, packet.trace_stats().tracing_sessions());
101 EXPECT_EQ(1u, packet.trace_stats().total_buffers());
102 EXPECT_EQ(1, packet.trace_stats().buffer_stats_size());
103
104 const auto& buf_stats = packet.trace_stats().buffer_stats()[0];
105 EXPECT_GT(buf_stats.bytes_written(), 0u);
106 EXPECT_GT(buf_stats.chunks_written(), 0u);
107 EXPECT_EQ(0u, buf_stats.chunks_overwritten());
108 EXPECT_EQ(0u, buf_stats.chunks_rewritten());
109 EXPECT_EQ(0u, buf_stats.chunks_committed_out_of_order());
110 EXPECT_EQ(0u, buf_stats.write_wrap_count());
111 EXPECT_EQ(0u, buf_stats.patches_failed());
112 EXPECT_EQ(0u, buf_stats.readaheads_failed());
113 EXPECT_EQ(0u, buf_stats.abi_violations());
114 }
115
116 } // namespace
117
118 class TracingIntegrationTest : public ::testing::Test {
119 public:
SetUp()120 void SetUp() override {
121 kProducerSock.Destroy();
122 kConsumerSock.Destroy();
123 task_runner_.reset(new base::TestTaskRunner());
124
125 // Create the service host.
126 svc_ = ServiceIPCHost::CreateInstance(task_runner_.get());
127 svc_->Start(kProducerSock.name(), kConsumerSock.name());
128
129 // Create and connect a Producer.
130 producer_endpoint_ = ProducerIPCClient::Connect(
131 kProducerSock.name(), &producer_, "perfetto.mock_producer",
132 task_runner_.get(), GetProducerSMBScrapingMode());
133 auto on_producer_connect =
134 task_runner_->CreateCheckpoint("on_producer_connect");
135 EXPECT_CALL(producer_, OnConnect()).WillOnce(Invoke(on_producer_connect));
136 task_runner_->RunUntilCheckpoint("on_producer_connect");
137
138 // Register a data source.
139 DataSourceDescriptor ds_desc;
140 ds_desc.set_name("perfetto.test");
141 producer_endpoint_->RegisterDataSource(ds_desc);
142
143 // Create and connect a Consumer.
144 consumer_endpoint_ = ConsumerIPCClient::Connect(
145 kConsumerSock.name(), &consumer_, task_runner_.get());
146 auto on_consumer_connect =
147 task_runner_->CreateCheckpoint("on_consumer_connect");
148 EXPECT_CALL(consumer_, OnConnect()).WillOnce(Invoke(on_consumer_connect));
149 task_runner_->RunUntilCheckpoint("on_consumer_connect");
150
151 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
152 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
153 }
154
TearDown()155 void TearDown() override {
156 // Destroy the service and check that both Producer and Consumer see an
157 // OnDisconnect() call.
158
159 auto on_producer_disconnect =
160 task_runner_->CreateCheckpoint("on_producer_disconnect");
161 EXPECT_CALL(producer_, OnDisconnect())
162 .WillOnce(Invoke(on_producer_disconnect));
163
164 auto on_consumer_disconnect =
165 task_runner_->CreateCheckpoint("on_consumer_disconnect");
166 EXPECT_CALL(consumer_, OnDisconnect())
167 .WillOnce(Invoke(on_consumer_disconnect));
168
169 svc_.reset();
170 task_runner_->RunUntilCheckpoint("on_producer_disconnect");
171 task_runner_->RunUntilCheckpoint("on_consumer_disconnect");
172
173 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
174 ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
175
176 task_runner_.reset();
177 kProducerSock.Destroy();
178 kConsumerSock.Destroy();
179 }
180
GetProducerSMBScrapingMode()181 virtual TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode() {
182 return TracingService::ProducerSMBScrapingMode::kDefault;
183 }
184
WaitForTraceWritersChanged(ProducerID producer_id)185 void WaitForTraceWritersChanged(ProducerID producer_id) {
186 static int i = 0;
187 auto checkpoint_name = "writers_changed_" + std::to_string(producer_id) +
188 "_" + std::to_string(i++);
189 auto writers_changed = task_runner_->CreateCheckpoint(checkpoint_name);
190 auto writers = GetWriters(producer_id);
191 std::function<void()> task;
192 task = [&task, writers, writers_changed, producer_id, this]() {
193 if (writers != GetWriters(producer_id)) {
194 writers_changed();
195 return;
196 }
197 task_runner_->PostDelayedTask(task, 1);
198 };
199 task_runner_->PostDelayedTask(task, 1);
200 task_runner_->RunUntilCheckpoint(checkpoint_name);
201 }
202
GetWriters(ProducerID producer_id)203 const std::map<WriterID, BufferID>& GetWriters(ProducerID producer_id) {
204 return reinterpret_cast<TracingServiceImpl*>(svc_->service())
205 ->GetProducer(producer_id)
206 ->writers_;
207 }
208
last_producer_id()209 ProducerID* last_producer_id() {
210 return &reinterpret_cast<TracingServiceImpl*>(svc_->service())
211 ->last_producer_id_;
212 }
213
214 std::unique_ptr<base::TestTaskRunner> task_runner_;
215 std::unique_ptr<ServiceIPCHost> svc_;
216 std::unique_ptr<TracingService::ProducerEndpoint> producer_endpoint_;
217 MockProducer producer_;
218 std::unique_ptr<TracingService::ConsumerEndpoint> consumer_endpoint_;
219 MockConsumer consumer_;
220 };
221
TEST_F(TracingIntegrationTest,WithIPCTransport)222 TEST_F(TracingIntegrationTest, WithIPCTransport) {
223 // Start tracing.
224 TraceConfig trace_config;
225 trace_config.add_buffers()->set_size_kb(4096 * 10);
226 auto* ds_config = trace_config.add_data_sources()->mutable_config();
227 ds_config->set_name("perfetto.test");
228 ds_config->set_target_buffer(0);
229 consumer_endpoint_->EnableTracing(trace_config);
230
231 // At this point, the Producer should be asked to turn its data source on.
232 DataSourceInstanceID ds_iid = 0;
233
234 BufferID global_buf_id = 0;
235 auto on_create_ds_instance =
236 task_runner_->CreateCheckpoint("on_create_ds_instance");
237 EXPECT_CALL(producer_, OnTracingSetup());
238
239 // Store the arguments passed to SetupDataSource() and later check that they
240 // match the ones passed to StartDataSource().
241 DataSourceInstanceID setup_id;
242 DataSourceConfig setup_cfg_proto;
243 EXPECT_CALL(producer_, SetupDataSource(_, _))
244 .WillOnce(
245 Invoke([&setup_id, &setup_cfg_proto](DataSourceInstanceID id,
246 const DataSourceConfig& cfg) {
247 setup_id = id;
248 setup_cfg_proto = cfg;
249 }));
250 EXPECT_CALL(producer_, StartDataSource(_, _))
251 .WillOnce(
252 Invoke([on_create_ds_instance, &ds_iid, &global_buf_id, &setup_id,
253 &setup_cfg_proto](DataSourceInstanceID id,
254 const DataSourceConfig& cfg) {
255 // id and config should match the ones passed to SetupDataSource.
256 ASSERT_EQ(id, setup_id);
257 ASSERT_EQ(setup_cfg_proto, cfg);
258 ASSERT_NE(0u, id);
259 ds_iid = id;
260 ASSERT_EQ("perfetto.test", cfg.name());
261 global_buf_id = static_cast<BufferID>(cfg.target_buffer());
262 ASSERT_NE(0u, global_buf_id);
263 ASSERT_LE(global_buf_id, std::numeric_limits<BufferID>::max());
264 on_create_ds_instance();
265 }));
266 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
267
268 // Now let the data source fill some pages within the same task.
269 // Doing so should accumulate a bunch of chunks that will be notified by the
270 // a future task in one batch.
271 std::unique_ptr<TraceWriter> writer =
272 producer_endpoint_->CreateTraceWriter(global_buf_id);
273 ASSERT_TRUE(writer);
274
275 const size_t kNumPackets = 10;
276 for (size_t i = 0; i < kNumPackets; i++) {
277 char buf[16];
278 sprintf(buf, "evt_%zu", i);
279 writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
280 }
281
282 // Allow the service to see the CommitData() before reading back.
283 auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
284 writer->Flush(on_data_committed);
285 task_runner_->RunUntilCheckpoint("on_data_committed");
286
287 // Read the log buffer.
288 consumer_endpoint_->ReadBuffers();
289 size_t num_pack_rx = 0;
290 bool saw_clock_snapshot = false;
291 bool saw_trace_config = false;
292 bool saw_trace_stats = false;
293 auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
294 EXPECT_CALL(consumer_, OnTracePackets(_, _))
295 .WillRepeatedly(
296 Invoke([&num_pack_rx, all_packets_rx, &trace_config,
297 &saw_clock_snapshot, &saw_trace_config, &saw_trace_stats](
298 std::vector<TracePacket>* packets, bool has_more) {
299 #if PERFETTO_BUILDFLAG(PERFETTO_OS_APPLE)
300 const int kExpectedMinNumberOfClocks = 1;
301 #elif PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
302 const int kExpectedMinNumberOfClocks = 2;
303 #else
304 const int kExpectedMinNumberOfClocks = 6;
305 #endif
306
307 for (auto& encoded_packet : *packets) {
308 protos::gen::TracePacket packet;
309 ASSERT_TRUE(packet.ParseFromString(
310 encoded_packet.GetRawBytesForTesting()));
311 if (packet.has_for_testing()) {
312 char buf[8];
313 sprintf(buf, "evt_%zu", num_pack_rx++);
314 EXPECT_EQ(std::string(buf), packet.for_testing().str());
315 } else if (packet.has_clock_snapshot()) {
316 EXPECT_GE(packet.clock_snapshot().clocks_size(),
317 kExpectedMinNumberOfClocks);
318 saw_clock_snapshot = true;
319 } else if (packet.has_trace_config()) {
320 EXPECT_EQ(packet.trace_config(), trace_config);
321 saw_trace_config = true;
322 } else if (packet.has_trace_stats()) {
323 saw_trace_stats = true;
324 CheckTraceStats(packet);
325 }
326 }
327 if (!has_more)
328 all_packets_rx();
329 }));
330 task_runner_->RunUntilCheckpoint("all_packets_rx");
331 ASSERT_EQ(kNumPackets, num_pack_rx);
332 EXPECT_TRUE(saw_clock_snapshot);
333 EXPECT_TRUE(saw_trace_config);
334 EXPECT_TRUE(saw_trace_stats);
335
336 // Disable tracing.
337 consumer_endpoint_->DisableTracing();
338
339 auto on_tracing_disabled =
340 task_runner_->CreateCheckpoint("on_tracing_disabled");
341 EXPECT_CALL(producer_, StopDataSource(_));
342 EXPECT_CALL(consumer_, OnTracingDisabled(_))
343 .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
344 task_runner_->RunUntilCheckpoint("on_tracing_disabled");
345 }
346
347 // Regression test for b/172950370.
TEST_F(TracingIntegrationTest,ValidErrorOnDisconnection)348 TEST_F(TracingIntegrationTest, ValidErrorOnDisconnection) {
349 // Start tracing.
350 TraceConfig trace_config;
351 trace_config.add_buffers()->set_size_kb(4096 * 10);
352 auto* ds_config = trace_config.add_data_sources()->mutable_config();
353 ds_config->set_name("perfetto.test");
354 consumer_endpoint_->EnableTracing(trace_config);
355
356 auto on_create_ds_instance =
357 task_runner_->CreateCheckpoint("on_create_ds_instance");
358 EXPECT_CALL(producer_, OnTracingSetup());
359
360 // Store the arguments passed to SetupDataSource() and later check that they
361 // match the ones passed to StartDataSource().
362 EXPECT_CALL(producer_, SetupDataSource(_, _));
363 EXPECT_CALL(producer_, StartDataSource(_, _))
364 .WillOnce(InvokeWithoutArgs(on_create_ds_instance));
365 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
366
367 EXPECT_CALL(consumer_, OnTracingDisabled(_))
368 .WillOnce(Invoke([](const std::string& err) {
369 EXPECT_THAT(err,
370 testing::HasSubstr("EnableTracing IPC request rejected"));
371 }));
372
373 // TearDown() will destroy the service via svc_.reset(). That will drop the
374 // connection and trigger the EXPECT_CALL(OnTracingDisabled) above.
375 }
376
377 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
TEST_F(TracingIntegrationTest,WriteIntoFile)378 TEST_F(TracingIntegrationTest, WriteIntoFile) {
379 // Start tracing.
380 TraceConfig trace_config;
381 trace_config.add_buffers()->set_size_kb(4096 * 10);
382 auto* ds_config = trace_config.add_data_sources()->mutable_config();
383 ds_config->set_name("perfetto.test");
384 ds_config->set_target_buffer(0);
385 trace_config.set_write_into_file(true);
386 base::TempFile tmp_file = base::TempFile::CreateUnlinked();
387 consumer_endpoint_->EnableTracing(trace_config,
388 base::ScopedFile(dup(tmp_file.fd())));
389
390 // At this point, the producer_ should be asked to turn its data source on.
391 BufferID global_buf_id = 0;
392 auto on_create_ds_instance =
393 task_runner_->CreateCheckpoint("on_create_ds_instance");
394 EXPECT_CALL(producer_, OnTracingSetup());
395 EXPECT_CALL(producer_, SetupDataSource(_, _));
396 EXPECT_CALL(producer_, StartDataSource(_, _))
397 .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
398 DataSourceInstanceID, const DataSourceConfig& cfg) {
399 global_buf_id = static_cast<BufferID>(cfg.target_buffer());
400 on_create_ds_instance();
401 }));
402 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
403
404 std::unique_ptr<TraceWriter> writer =
405 producer_endpoint_->CreateTraceWriter(global_buf_id);
406 ASSERT_TRUE(writer);
407
408 const size_t kNumPackets = 10;
409 for (size_t i = 0; i < kNumPackets; i++) {
410 char buf[16];
411 sprintf(buf, "evt_%zu", i);
412 writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
413 }
414 auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
415 writer->Flush(on_data_committed);
416 task_runner_->RunUntilCheckpoint("on_data_committed");
417
418 // Will disable tracing and will force the buffers to be written into the
419 // file before destroying them.
420 consumer_endpoint_->FreeBuffers();
421
422 auto on_tracing_disabled =
423 task_runner_->CreateCheckpoint("on_tracing_disabled");
424 EXPECT_CALL(producer_, StopDataSource(_));
425 EXPECT_CALL(consumer_, OnTracingDisabled(_))
426 .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
427 task_runner_->RunUntilCheckpoint("on_tracing_disabled");
428
429 // Check that |tmp_file| contains a valid trace.proto message.
430 ASSERT_EQ(0, lseek(tmp_file.fd(), 0, SEEK_SET));
431 char tmp_buf[1024];
432 ssize_t rsize = read(tmp_file.fd(), tmp_buf, sizeof(tmp_buf));
433 ASSERT_GT(rsize, 0);
434 protos::gen::Trace tmp_trace;
435 ASSERT_TRUE(tmp_trace.ParseFromArray(tmp_buf, static_cast<size_t>(rsize)));
436 size_t num_test_packet = 0;
437 size_t num_clock_snapshot_packet = 0;
438 size_t num_system_info_packet = 0;
439 bool saw_trace_stats = false;
440 for (int i = 0; i < tmp_trace.packet_size(); i++) {
441 const auto& packet = tmp_trace.packet()[static_cast<size_t>(i)];
442 if (packet.has_for_testing()) {
443 ASSERT_EQ("evt_" + std::to_string(num_test_packet++),
444 packet.for_testing().str());
445 } else if (packet.has_trace_stats()) {
446 saw_trace_stats = true;
447 CheckTraceStats(packet);
448 } else if (packet.has_clock_snapshot()) {
449 num_clock_snapshot_packet++;
450 } else if (packet.has_system_info()) {
451 num_system_info_packet++;
452 }
453 }
454 ASSERT_TRUE(saw_trace_stats);
455 ASSERT_GT(num_clock_snapshot_packet, 0u);
456 ASSERT_GT(num_system_info_packet, 0u);
457 }
458 #endif
459
460 class TracingIntegrationTestWithSMBScrapingProducer
461 : public TracingIntegrationTest {
462 public:
GetProducerSMBScrapingMode()463 TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode()
464 override {
465 return TracingService::ProducerSMBScrapingMode::kEnabled;
466 }
467 };
468
TEST_F(TracingIntegrationTestWithSMBScrapingProducer,ScrapeOnFlush)469 TEST_F(TracingIntegrationTestWithSMBScrapingProducer, ScrapeOnFlush) {
470 // Start tracing.
471 TraceConfig trace_config;
472 trace_config.add_buffers()->set_size_kb(4096 * 10);
473 auto* ds_config = trace_config.add_data_sources()->mutable_config();
474 ds_config->set_name("perfetto.test");
475 ds_config->set_target_buffer(0);
476 consumer_endpoint_->EnableTracing(trace_config);
477
478 // At this point, the Producer should be asked to turn its data source on.
479
480 BufferID global_buf_id = 0;
481 auto on_create_ds_instance =
482 task_runner_->CreateCheckpoint("on_create_ds_instance");
483 EXPECT_CALL(producer_, OnTracingSetup());
484
485 EXPECT_CALL(producer_, SetupDataSource(_, _));
486 EXPECT_CALL(producer_, StartDataSource(_, _))
487 .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
488 DataSourceInstanceID, const DataSourceConfig& cfg) {
489 global_buf_id = static_cast<BufferID>(cfg.target_buffer());
490 on_create_ds_instance();
491 }));
492 task_runner_->RunUntilCheckpoint("on_create_ds_instance");
493
494 // Create writer, which will post a task to register the writer with the
495 // service.
496 std::unique_ptr<TraceWriter> writer =
497 producer_endpoint_->CreateTraceWriter(global_buf_id);
498 ASSERT_TRUE(writer);
499
500 // Wait for the writer to be registered.
501 WaitForTraceWritersChanged(*last_producer_id());
502
503 // Write a few trace packets.
504 writer->NewTracePacket()->set_for_testing()->set_str("payload1");
505 writer->NewTracePacket()->set_for_testing()->set_str("payload2");
506 writer->NewTracePacket()->set_for_testing()->set_str("payload3");
507
508 // Ask the service to flush, but don't flush our trace writer. This should
509 // cause our uncommitted SMB chunk to be scraped.
510 auto on_flush_complete = task_runner_->CreateCheckpoint("on_flush_complete");
511 consumer_endpoint_->Flush(5000, [on_flush_complete](bool success) {
512 EXPECT_TRUE(success);
513 on_flush_complete();
514 });
515 EXPECT_CALL(producer_, Flush(_, _, _))
516 .WillOnce(Invoke([this](FlushRequestID flush_req_id,
517 const DataSourceInstanceID*, size_t) {
518 producer_endpoint_->NotifyFlushComplete(flush_req_id);
519 }));
520 task_runner_->RunUntilCheckpoint("on_flush_complete");
521
522 // Read the log buffer. We should only see the first two written trace
523 // packets, because the service can't be sure the last one was written
524 // completely by the trace writer.
525 consumer_endpoint_->ReadBuffers();
526
527 size_t num_test_pack_rx = 0;
528 auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
529 EXPECT_CALL(consumer_, OnTracePackets(_, _))
530 .WillRepeatedly(
531 Invoke([&num_test_pack_rx, all_packets_rx](
532 std::vector<TracePacket>* packets, bool has_more) {
533 for (auto& encoded_packet : *packets) {
534 protos::gen::TracePacket packet;
535 ASSERT_TRUE(packet.ParseFromString(
536 encoded_packet.GetRawBytesForTesting()));
537 if (packet.has_for_testing()) {
538 num_test_pack_rx++;
539 }
540 }
541 if (!has_more)
542 all_packets_rx();
543 }));
544 task_runner_->RunUntilCheckpoint("all_packets_rx");
545 ASSERT_EQ(2u, num_test_pack_rx);
546
547 // Disable tracing.
548 consumer_endpoint_->DisableTracing();
549
550 auto on_tracing_disabled =
551 task_runner_->CreateCheckpoint("on_tracing_disabled");
552 auto on_stop_ds = task_runner_->CreateCheckpoint("on_stop_ds");
553 EXPECT_CALL(producer_, StopDataSource(_))
554 .WillOnce(InvokeWithoutArgs(on_stop_ds));
555 EXPECT_CALL(consumer_, OnTracingDisabled(_))
556 .WillOnce(InvokeWithoutArgs(on_tracing_disabled));
557 task_runner_->RunUntilCheckpoint("on_stop_ds");
558 task_runner_->RunUntilCheckpoint("on_tracing_disabled");
559 }
560
561 // TODO(primiano): add tests to cover:
562 // - unknown fields preserved end-to-end.
563 // - >1 data source.
564 // - >1 data consumer sharing the same data source, with different TraceBuffers.
565 // - >1 consumer with > 1 buffer each.
566 // - Consumer disconnecting in the middle of a ReadBuffers() call.
567 // - Multiple calls to DisableTracing.
568 // - Out of order Enable/Disable/FreeBuffers calls.
569 // - DisableTracing does actually freeze the buffers.
570
571 } // namespace perfetto
572