1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/ipc/producer/producer_ipc_client_impl.h"
18
19 #include <inttypes.h>
20 #include <string.h>
21
22 #include "perfetto/base/logging.h"
23 #include "perfetto/base/task_runner.h"
24 #include "perfetto/ext/base/version.h"
25 #include "perfetto/ext/ipc/client.h"
26 #include "perfetto/ext/tracing/core/commit_data_request.h"
27 #include "perfetto/ext/tracing/core/producer.h"
28 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
29 #include "perfetto/ext/tracing/core/trace_writer.h"
30 #include "perfetto/tracing/core/data_source_config.h"
31 #include "perfetto/tracing/core/data_source_descriptor.h"
32 #include "perfetto/tracing/core/trace_config.h"
33 #include "src/tracing/ipc/posix_shared_memory.h"
34
35 // TODO(fmayer): think to what happens when ProducerIPCClientImpl gets destroyed
36 // w.r.t. the Producer pointer. Also think to lifetime of the Producer* during
37 // the callbacks.
38
39 namespace perfetto {
40
41 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter,ConnectionFlags conn_flags)42 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
43 const char* service_sock_name,
44 Producer* producer,
45 const std::string& producer_name,
46 base::TaskRunner* task_runner,
47 TracingService::ProducerSMBScrapingMode smb_scraping_mode,
48 size_t shared_memory_size_hint_bytes,
49 size_t shared_memory_page_size_hint_bytes,
50 std::unique_ptr<SharedMemory> shm,
51 std::unique_ptr<SharedMemoryArbiter> shm_arbiter,
52 ConnectionFlags conn_flags) {
53 return std::unique_ptr<TracingService::ProducerEndpoint>(
54 new ProducerIPCClientImpl(
55 {service_sock_name,
56 conn_flags ==
57 ProducerIPCClient::ConnectionFlags::kRetryIfUnreachable},
58 producer, producer_name, task_runner, smb_scraping_mode,
59 shared_memory_size_hint_bytes, shared_memory_page_size_hint_bytes,
60 std::move(shm), std::move(shm_arbiter)));
61 }
62
63 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(ipc::Client::ConnArgs conn_args,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter)64 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
65 ipc::Client::ConnArgs conn_args,
66 Producer* producer,
67 const std::string& producer_name,
68 base::TaskRunner* task_runner,
69 TracingService::ProducerSMBScrapingMode smb_scraping_mode,
70 size_t shared_memory_size_hint_bytes,
71 size_t shared_memory_page_size_hint_bytes,
72 std::unique_ptr<SharedMemory> shm,
73 std::unique_ptr<SharedMemoryArbiter> shm_arbiter) {
74 return std::unique_ptr<TracingService::ProducerEndpoint>(
75 new ProducerIPCClientImpl(std::move(conn_args), producer, producer_name,
76 task_runner, smb_scraping_mode,
77 shared_memory_size_hint_bytes,
78 shared_memory_page_size_hint_bytes,
79 std::move(shm), std::move(shm_arbiter)));
80 }
81
ProducerIPCClientImpl(ipc::Client::ConnArgs conn_args,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter)82 ProducerIPCClientImpl::ProducerIPCClientImpl(
83 ipc::Client::ConnArgs conn_args,
84 Producer* producer,
85 const std::string& producer_name,
86 base::TaskRunner* task_runner,
87 TracingService::ProducerSMBScrapingMode smb_scraping_mode,
88 size_t shared_memory_size_hint_bytes,
89 size_t shared_memory_page_size_hint_bytes,
90 std::unique_ptr<SharedMemory> shm,
91 std::unique_ptr<SharedMemoryArbiter> shm_arbiter)
92 : producer_(producer),
93 task_runner_(task_runner),
94 ipc_channel_(
95 ipc::Client::CreateInstance(std::move(conn_args), task_runner)),
96 producer_port_(this /* event_listener */),
97 shared_memory_(std::move(shm)),
98 shared_memory_arbiter_(std::move(shm_arbiter)),
99 name_(producer_name),
100 shared_memory_page_size_hint_bytes_(shared_memory_page_size_hint_bytes),
101 shared_memory_size_hint_bytes_(shared_memory_size_hint_bytes),
102 smb_scraping_mode_(smb_scraping_mode) {
103 // Check for producer-provided SMB (used by Chrome for startup tracing).
104 if (shared_memory_) {
105 // We also expect a valid (unbound) arbiter. Bind it to this endpoint now.
106 PERFETTO_CHECK(shared_memory_arbiter_);
107 shared_memory_arbiter_->BindToProducerEndpoint(this, task_runner_);
108
109 // If the service accepts our SMB, then it must match our requested page
110 // layout. The protocol doesn't allow the service to change the size and
111 // layout when the SMB is provided by the producer.
112 shared_buffer_page_size_kb_ = shared_memory_page_size_hint_bytes_ / 1024;
113 }
114
115 ipc_channel_->BindService(producer_port_.GetWeakPtr());
116 PERFETTO_DCHECK_THREAD(thread_checker_);
117 }
118
~ProducerIPCClientImpl()119 ProducerIPCClientImpl::~ProducerIPCClientImpl() {
120 PERFETTO_DCHECK_THREAD(thread_checker_);
121 }
122
123 // Called by the IPC layer if the BindService() succeeds.
OnConnect()124 void ProducerIPCClientImpl::OnConnect() {
125 PERFETTO_DCHECK_THREAD(thread_checker_);
126 connected_ = true;
127
128 // The IPC layer guarantees that any outstanding callback will be dropped on
129 // the floor if producer_port_ is destroyed between the request and the reply.
130 // Binding |this| is hence safe.
131 ipc::Deferred<protos::gen::InitializeConnectionResponse> on_init;
132 on_init.Bind(
133 [this](ipc::AsyncResult<protos::gen::InitializeConnectionResponse> resp) {
134 OnConnectionInitialized(
135 resp.success(),
136 resp.success() ? resp->using_shmem_provided_by_producer() : false,
137 resp.success() ? resp->direct_smb_patching_supported() : false);
138 });
139 protos::gen::InitializeConnectionRequest req;
140 req.set_producer_name(name_);
141 req.set_shared_memory_size_hint_bytes(
142 static_cast<uint32_t>(shared_memory_size_hint_bytes_));
143 req.set_shared_memory_page_size_hint_bytes(
144 static_cast<uint32_t>(shared_memory_page_size_hint_bytes_));
145 switch (smb_scraping_mode_) {
146 case TracingService::ProducerSMBScrapingMode::kDefault:
147 // No need to set the mode, it defaults to use the service default if
148 // unspecified.
149 break;
150 case TracingService::ProducerSMBScrapingMode::kEnabled:
151 req.set_smb_scraping_mode(
152 protos::gen::InitializeConnectionRequest::SMB_SCRAPING_ENABLED);
153 break;
154 case TracingService::ProducerSMBScrapingMode::kDisabled:
155 req.set_smb_scraping_mode(
156 protos::gen::InitializeConnectionRequest::SMB_SCRAPING_DISABLED);
157 break;
158 }
159
160 int shm_fd = -1;
161 if (shared_memory_) {
162 shm_fd = static_cast<PosixSharedMemory*>(shared_memory_.get())->fd();
163 req.set_producer_provided_shmem(true);
164 }
165
166 #if PERFETTO_DCHECK_IS_ON()
167 req.set_build_flags(
168 protos::gen::InitializeConnectionRequest::BUILD_FLAGS_DCHECKS_ON);
169 #else
170 req.set_build_flags(
171 protos::gen::InitializeConnectionRequest::BUILD_FLAGS_DCHECKS_OFF);
172 #endif
173 req.set_sdk_version(base::GetVersionString());
174 producer_port_.InitializeConnection(req, std::move(on_init), shm_fd);
175
176 // Create the back channel to receive commands from the Service.
177 ipc::Deferred<protos::gen::GetAsyncCommandResponse> on_cmd;
178 on_cmd.Bind(
179 [this](ipc::AsyncResult<protos::gen::GetAsyncCommandResponse> resp) {
180 if (!resp)
181 return; // The IPC channel was closed and |resp| was auto-rejected.
182 OnServiceRequest(*resp);
183 });
184 producer_port_.GetAsyncCommand(protos::gen::GetAsyncCommandRequest(),
185 std::move(on_cmd));
186
187 // If there are pending Sync() requests, send them now.
188 for (const auto& pending_sync : pending_sync_reqs_)
189 Sync(std::move(pending_sync));
190 pending_sync_reqs_.clear();
191 }
192
OnDisconnect()193 void ProducerIPCClientImpl::OnDisconnect() {
194 PERFETTO_DCHECK_THREAD(thread_checker_);
195 PERFETTO_DLOG("Tracing service connection failure");
196 connected_ = false;
197 data_sources_setup_.clear();
198 producer_->OnDisconnect(); // Note: may delete |this|.
199 }
200
OnConnectionInitialized(bool connection_succeeded,bool using_shmem_provided_by_producer,bool direct_smb_patching_supported)201 void ProducerIPCClientImpl::OnConnectionInitialized(
202 bool connection_succeeded,
203 bool using_shmem_provided_by_producer,
204 bool direct_smb_patching_supported) {
205 PERFETTO_DCHECK_THREAD(thread_checker_);
206 // If connection_succeeded == false, the OnDisconnect() call will follow next
207 // and there we'll notify the |producer_|. TODO: add a test for this.
208 if (!connection_succeeded)
209 return;
210 is_shmem_provided_by_producer_ = using_shmem_provided_by_producer;
211 direct_smb_patching_supported_ = direct_smb_patching_supported;
212 producer_->OnConnect();
213
214 // Bail out if the service failed to adopt our producer-allocated SMB.
215 // TODO(eseckler): Handle adoption failure more gracefully.
216 if (shared_memory_ && !is_shmem_provided_by_producer_) {
217 PERFETTO_DLOG("Service failed adopt producer-provided SMB, disconnecting.");
218 ipc_channel_.reset();
219 return;
220 }
221 }
222
OnServiceRequest(const protos::gen::GetAsyncCommandResponse & cmd)223 void ProducerIPCClientImpl::OnServiceRequest(
224 const protos::gen::GetAsyncCommandResponse& cmd) {
225 PERFETTO_DCHECK_THREAD(thread_checker_);
226
227 // This message is sent only when connecting to a service running Android Q+.
228 // See comment below in kStartDataSource.
229 if (cmd.has_setup_data_source()) {
230 const auto& req = cmd.setup_data_source();
231 const DataSourceInstanceID dsid = req.new_instance_id();
232 data_sources_setup_.insert(dsid);
233 producer_->SetupDataSource(dsid, req.config());
234 return;
235 }
236
237 if (cmd.has_start_data_source()) {
238 const auto& req = cmd.start_data_source();
239 const DataSourceInstanceID dsid = req.new_instance_id();
240 const DataSourceConfig& cfg = req.config();
241 if (!data_sources_setup_.count(dsid)) {
242 // When connecting with an older (Android P) service, the service will not
243 // send a SetupDataSource message. We synthesize it here in that case.
244 producer_->SetupDataSource(dsid, cfg);
245 }
246 producer_->StartDataSource(dsid, cfg);
247 return;
248 }
249
250 if (cmd.has_stop_data_source()) {
251 const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
252 producer_->StopDataSource(dsid);
253 data_sources_setup_.erase(dsid);
254 return;
255 }
256
257 if (cmd.has_setup_tracing()) {
258 base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
259 if (shmem_fd) {
260 // This is the nominal case used in most configurations, where the service
261 // provides the SMB.
262 PERFETTO_CHECK(!is_shmem_provided_by_producer_ && !shared_memory_);
263 // TODO(primiano): handle mmap failure in case of OOM.
264 shared_memory_ =
265 PosixSharedMemory::AttachToFd(std::move(shmem_fd),
266 /*require_seals_if_supported=*/false);
267 shared_buffer_page_size_kb_ =
268 cmd.setup_tracing().shared_buffer_page_size_kb();
269 shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance(
270 shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, this,
271 task_runner_);
272 if (direct_smb_patching_supported_)
273 shared_memory_arbiter_->SetDirectSMBPatchingSupportedByService();
274 } else {
275 // Producer-provided SMB (used by Chrome for startup tracing).
276 PERFETTO_CHECK(is_shmem_provided_by_producer_ && shared_memory_ &&
277 shared_memory_arbiter_);
278 }
279 producer_->OnTracingSetup();
280 return;
281 }
282
283 if (cmd.has_flush()) {
284 // This cast boilerplate is required only because protobuf uses its own
285 // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
286 // type (long vs long long) even though they have the same size.
287 const auto* data_source_ids = cmd.flush().data_source_ids().data();
288 static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
289 "data_source_ids should be 64-bit");
290 producer_->Flush(
291 cmd.flush().request_id(),
292 reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
293 static_cast<size_t>(cmd.flush().data_source_ids().size()));
294 return;
295 }
296
297 if (cmd.has_clear_incremental_state()) {
298 const auto* data_source_ids =
299 cmd.clear_incremental_state().data_source_ids().data();
300 static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
301 "data_source_ids should be 64-bit");
302 producer_->ClearIncrementalState(
303 reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
304 static_cast<size_t>(
305 cmd.clear_incremental_state().data_source_ids().size()));
306 return;
307 }
308
309 PERFETTO_DFATAL("Unknown async request received from tracing service");
310 }
311
RegisterDataSource(const DataSourceDescriptor & descriptor)312 void ProducerIPCClientImpl::RegisterDataSource(
313 const DataSourceDescriptor& descriptor) {
314 PERFETTO_DCHECK_THREAD(thread_checker_);
315 if (!connected_) {
316 PERFETTO_DLOG(
317 "Cannot RegisterDataSource(), not connected to tracing service");
318 }
319 protos::gen::RegisterDataSourceRequest req;
320 *req.mutable_data_source_descriptor() = descriptor;
321 ipc::Deferred<protos::gen::RegisterDataSourceResponse> async_response;
322 async_response.Bind(
323 [](ipc::AsyncResult<protos::gen::RegisterDataSourceResponse> response) {
324 if (!response)
325 PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
326 });
327 producer_port_.RegisterDataSource(req, std::move(async_response));
328 }
329
UnregisterDataSource(const std::string & name)330 void ProducerIPCClientImpl::UnregisterDataSource(const std::string& name) {
331 PERFETTO_DCHECK_THREAD(thread_checker_);
332 if (!connected_) {
333 PERFETTO_DLOG(
334 "Cannot UnregisterDataSource(), not connected to tracing service");
335 return;
336 }
337 protos::gen::UnregisterDataSourceRequest req;
338 req.set_data_source_name(name);
339 producer_port_.UnregisterDataSource(
340 req, ipc::Deferred<protos::gen::UnregisterDataSourceResponse>());
341 }
342
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)343 void ProducerIPCClientImpl::RegisterTraceWriter(uint32_t writer_id,
344 uint32_t target_buffer) {
345 PERFETTO_DCHECK_THREAD(thread_checker_);
346 if (!connected_) {
347 PERFETTO_DLOG(
348 "Cannot RegisterTraceWriter(), not connected to tracing service");
349 return;
350 }
351 protos::gen::RegisterTraceWriterRequest req;
352 req.set_trace_writer_id(writer_id);
353 req.set_target_buffer(target_buffer);
354 producer_port_.RegisterTraceWriter(
355 req, ipc::Deferred<protos::gen::RegisterTraceWriterResponse>());
356 }
357
UnregisterTraceWriter(uint32_t writer_id)358 void ProducerIPCClientImpl::UnregisterTraceWriter(uint32_t writer_id) {
359 PERFETTO_DCHECK_THREAD(thread_checker_);
360 if (!connected_) {
361 PERFETTO_DLOG(
362 "Cannot UnregisterTraceWriter(), not connected to tracing service");
363 return;
364 }
365 protos::gen::UnregisterTraceWriterRequest req;
366 req.set_trace_writer_id(writer_id);
367 producer_port_.UnregisterTraceWriter(
368 req, ipc::Deferred<protos::gen::UnregisterTraceWriterResponse>());
369 }
370
CommitData(const CommitDataRequest & req,CommitDataCallback callback)371 void ProducerIPCClientImpl::CommitData(const CommitDataRequest& req,
372 CommitDataCallback callback) {
373 PERFETTO_DCHECK_THREAD(thread_checker_);
374 if (!connected_) {
375 PERFETTO_DLOG("Cannot CommitData(), not connected to tracing service");
376 return;
377 }
378 ipc::Deferred<protos::gen::CommitDataResponse> async_response;
379 // TODO(primiano): add a test that destroys ProducerIPCClientImpl soon after
380 // this call and checks that the callback is dropped.
381 if (callback) {
382 async_response.Bind(
383 [callback](ipc::AsyncResult<protos::gen::CommitDataResponse> response) {
384 if (!response) {
385 PERFETTO_DLOG("CommitData() failed: connection reset");
386 return;
387 }
388 callback();
389 });
390 }
391 producer_port_.CommitData(req, std::move(async_response));
392 }
393
NotifyDataSourceStarted(DataSourceInstanceID id)394 void ProducerIPCClientImpl::NotifyDataSourceStarted(DataSourceInstanceID id) {
395 PERFETTO_DCHECK_THREAD(thread_checker_);
396 if (!connected_) {
397 PERFETTO_DLOG(
398 "Cannot NotifyDataSourceStarted(), not connected to tracing service");
399 return;
400 }
401 protos::gen::NotifyDataSourceStartedRequest req;
402 req.set_data_source_id(id);
403 producer_port_.NotifyDataSourceStarted(
404 req, ipc::Deferred<protos::gen::NotifyDataSourceStartedResponse>());
405 }
406
NotifyDataSourceStopped(DataSourceInstanceID id)407 void ProducerIPCClientImpl::NotifyDataSourceStopped(DataSourceInstanceID id) {
408 PERFETTO_DCHECK_THREAD(thread_checker_);
409 if (!connected_) {
410 PERFETTO_DLOG(
411 "Cannot NotifyDataSourceStopped(), not connected to tracing service");
412 return;
413 }
414 protos::gen::NotifyDataSourceStoppedRequest req;
415 req.set_data_source_id(id);
416 producer_port_.NotifyDataSourceStopped(
417 req, ipc::Deferred<protos::gen::NotifyDataSourceStoppedResponse>());
418 }
419
ActivateTriggers(const std::vector<std::string> & triggers)420 void ProducerIPCClientImpl::ActivateTriggers(
421 const std::vector<std::string>& triggers) {
422 PERFETTO_DCHECK_THREAD(thread_checker_);
423 if (!connected_) {
424 PERFETTO_DLOG(
425 "Cannot ActivateTriggers(), not connected to tracing service");
426 return;
427 }
428 protos::gen::ActivateTriggersRequest proto_req;
429 for (const auto& name : triggers) {
430 *proto_req.add_trigger_names() = name;
431 }
432 producer_port_.ActivateTriggers(
433 proto_req, ipc::Deferred<protos::gen::ActivateTriggersResponse>());
434 }
435
Sync(std::function<void ()> callback)436 void ProducerIPCClientImpl::Sync(std::function<void()> callback) {
437 PERFETTO_DCHECK_THREAD(thread_checker_);
438 if (!connected_) {
439 pending_sync_reqs_.emplace_back(std::move(callback));
440 return;
441 }
442 ipc::Deferred<protos::gen::SyncResponse> resp;
443 resp.Bind([callback](ipc::AsyncResult<protos::gen::SyncResponse>) {
444 // Here we ACK the callback even if the service replies with a failure
445 // (i.e. the service is too old and doesn't understand Sync()). In that
446 // case the service has still seen the request, the IPC roundtrip is
447 // still a (weaker) linearization fence.
448 callback();
449 });
450 producer_port_.Sync(protos::gen::SyncRequest(), std::move(resp));
451 }
452
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)453 std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter(
454 BufferID target_buffer,
455 BufferExhaustedPolicy buffer_exhausted_policy) {
456 // This method can be called by different threads. |shared_memory_arbiter_| is
457 // thread-safe but be aware of accessing any other state in this function.
458 return shared_memory_arbiter_->CreateTraceWriter(target_buffer,
459 buffer_exhausted_policy);
460 }
461
MaybeSharedMemoryArbiter()462 SharedMemoryArbiter* ProducerIPCClientImpl::MaybeSharedMemoryArbiter() {
463 return shared_memory_arbiter_.get();
464 }
465
IsShmemProvidedByProducer() const466 bool ProducerIPCClientImpl::IsShmemProvidedByProducer() const {
467 return is_shmem_provided_by_producer_;
468 }
469
NotifyFlushComplete(FlushRequestID req_id)470 void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
471 return shared_memory_arbiter_->NotifyFlushComplete(req_id);
472 }
473
shared_memory() const474 SharedMemory* ProducerIPCClientImpl::shared_memory() const {
475 return shared_memory_.get();
476 }
477
shared_buffer_page_size_kb() const478 size_t ProducerIPCClientImpl::shared_buffer_page_size_kb() const {
479 return shared_buffer_page_size_kb_;
480 }
481
482 } // namespace perfetto
483