1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/producer/producer_ipc_client_impl.h"
18 
19 #include <inttypes.h>
20 #include <string.h>
21 
22 #include "perfetto/base/logging.h"
23 #include "perfetto/base/task_runner.h"
24 #include "perfetto/ext/base/version.h"
25 #include "perfetto/ext/ipc/client.h"
26 #include "perfetto/ext/tracing/core/commit_data_request.h"
27 #include "perfetto/ext/tracing/core/producer.h"
28 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
29 #include "perfetto/ext/tracing/core/trace_writer.h"
30 #include "perfetto/tracing/core/data_source_config.h"
31 #include "perfetto/tracing/core/data_source_descriptor.h"
32 #include "perfetto/tracing/core/trace_config.h"
33 #include "src/tracing/ipc/posix_shared_memory.h"
34 
35 // TODO(fmayer): think to what happens when ProducerIPCClientImpl gets destroyed
36 // w.r.t. the Producer pointer. Also think to lifetime of the Producer* during
37 // the callbacks.
38 
39 namespace perfetto {
40 
41 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(const char * service_sock_name,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter,ConnectionFlags conn_flags)42 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
43     const char* service_sock_name,
44     Producer* producer,
45     const std::string& producer_name,
46     base::TaskRunner* task_runner,
47     TracingService::ProducerSMBScrapingMode smb_scraping_mode,
48     size_t shared_memory_size_hint_bytes,
49     size_t shared_memory_page_size_hint_bytes,
50     std::unique_ptr<SharedMemory> shm,
51     std::unique_ptr<SharedMemoryArbiter> shm_arbiter,
52     ConnectionFlags conn_flags) {
53   return std::unique_ptr<TracingService::ProducerEndpoint>(
54       new ProducerIPCClientImpl(
55           {service_sock_name,
56            conn_flags ==
57                ProducerIPCClient::ConnectionFlags::kRetryIfUnreachable},
58           producer, producer_name, task_runner, smb_scraping_mode,
59           shared_memory_size_hint_bytes, shared_memory_page_size_hint_bytes,
60           std::move(shm), std::move(shm_arbiter)));
61 }
62 
63 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
Connect(ipc::Client::ConnArgs conn_args,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter)64 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
65     ipc::Client::ConnArgs conn_args,
66     Producer* producer,
67     const std::string& producer_name,
68     base::TaskRunner* task_runner,
69     TracingService::ProducerSMBScrapingMode smb_scraping_mode,
70     size_t shared_memory_size_hint_bytes,
71     size_t shared_memory_page_size_hint_bytes,
72     std::unique_ptr<SharedMemory> shm,
73     std::unique_ptr<SharedMemoryArbiter> shm_arbiter) {
74   return std::unique_ptr<TracingService::ProducerEndpoint>(
75       new ProducerIPCClientImpl(std::move(conn_args), producer, producer_name,
76                                 task_runner, smb_scraping_mode,
77                                 shared_memory_size_hint_bytes,
78                                 shared_memory_page_size_hint_bytes,
79                                 std::move(shm), std::move(shm_arbiter)));
80 }
81 
ProducerIPCClientImpl(ipc::Client::ConnArgs conn_args,Producer * producer,const std::string & producer_name,base::TaskRunner * task_runner,TracingService::ProducerSMBScrapingMode smb_scraping_mode,size_t shared_memory_size_hint_bytes,size_t shared_memory_page_size_hint_bytes,std::unique_ptr<SharedMemory> shm,std::unique_ptr<SharedMemoryArbiter> shm_arbiter)82 ProducerIPCClientImpl::ProducerIPCClientImpl(
83     ipc::Client::ConnArgs conn_args,
84     Producer* producer,
85     const std::string& producer_name,
86     base::TaskRunner* task_runner,
87     TracingService::ProducerSMBScrapingMode smb_scraping_mode,
88     size_t shared_memory_size_hint_bytes,
89     size_t shared_memory_page_size_hint_bytes,
90     std::unique_ptr<SharedMemory> shm,
91     std::unique_ptr<SharedMemoryArbiter> shm_arbiter)
92     : producer_(producer),
93       task_runner_(task_runner),
94       ipc_channel_(
95           ipc::Client::CreateInstance(std::move(conn_args), task_runner)),
96       producer_port_(this /* event_listener */),
97       shared_memory_(std::move(shm)),
98       shared_memory_arbiter_(std::move(shm_arbiter)),
99       name_(producer_name),
100       shared_memory_page_size_hint_bytes_(shared_memory_page_size_hint_bytes),
101       shared_memory_size_hint_bytes_(shared_memory_size_hint_bytes),
102       smb_scraping_mode_(smb_scraping_mode) {
103   // Check for producer-provided SMB (used by Chrome for startup tracing).
104   if (shared_memory_) {
105     // We also expect a valid (unbound) arbiter. Bind it to this endpoint now.
106     PERFETTO_CHECK(shared_memory_arbiter_);
107     shared_memory_arbiter_->BindToProducerEndpoint(this, task_runner_);
108 
109     // If the service accepts our SMB, then it must match our requested page
110     // layout. The protocol doesn't allow the service to change the size and
111     // layout when the SMB is provided by the producer.
112     shared_buffer_page_size_kb_ = shared_memory_page_size_hint_bytes_ / 1024;
113   }
114 
115   ipc_channel_->BindService(producer_port_.GetWeakPtr());
116   PERFETTO_DCHECK_THREAD(thread_checker_);
117 }
118 
~ProducerIPCClientImpl()119 ProducerIPCClientImpl::~ProducerIPCClientImpl() {
120   PERFETTO_DCHECK_THREAD(thread_checker_);
121 }
122 
123 // Called by the IPC layer if the BindService() succeeds.
OnConnect()124 void ProducerIPCClientImpl::OnConnect() {
125   PERFETTO_DCHECK_THREAD(thread_checker_);
126   connected_ = true;
127 
128   // The IPC layer guarantees that any outstanding callback will be dropped on
129   // the floor if producer_port_ is destroyed between the request and the reply.
130   // Binding |this| is hence safe.
131   ipc::Deferred<protos::gen::InitializeConnectionResponse> on_init;
132   on_init.Bind(
133       [this](ipc::AsyncResult<protos::gen::InitializeConnectionResponse> resp) {
134         OnConnectionInitialized(
135             resp.success(),
136             resp.success() ? resp->using_shmem_provided_by_producer() : false,
137             resp.success() ? resp->direct_smb_patching_supported() : false);
138       });
139   protos::gen::InitializeConnectionRequest req;
140   req.set_producer_name(name_);
141   req.set_shared_memory_size_hint_bytes(
142       static_cast<uint32_t>(shared_memory_size_hint_bytes_));
143   req.set_shared_memory_page_size_hint_bytes(
144       static_cast<uint32_t>(shared_memory_page_size_hint_bytes_));
145   switch (smb_scraping_mode_) {
146     case TracingService::ProducerSMBScrapingMode::kDefault:
147       // No need to set the mode, it defaults to use the service default if
148       // unspecified.
149       break;
150     case TracingService::ProducerSMBScrapingMode::kEnabled:
151       req.set_smb_scraping_mode(
152           protos::gen::InitializeConnectionRequest::SMB_SCRAPING_ENABLED);
153       break;
154     case TracingService::ProducerSMBScrapingMode::kDisabled:
155       req.set_smb_scraping_mode(
156           protos::gen::InitializeConnectionRequest::SMB_SCRAPING_DISABLED);
157       break;
158   }
159 
160   int shm_fd = -1;
161   if (shared_memory_) {
162     shm_fd = static_cast<PosixSharedMemory*>(shared_memory_.get())->fd();
163     req.set_producer_provided_shmem(true);
164   }
165 
166 #if PERFETTO_DCHECK_IS_ON()
167   req.set_build_flags(
168       protos::gen::InitializeConnectionRequest::BUILD_FLAGS_DCHECKS_ON);
169 #else
170   req.set_build_flags(
171       protos::gen::InitializeConnectionRequest::BUILD_FLAGS_DCHECKS_OFF);
172 #endif
173   req.set_sdk_version(base::GetVersionString());
174   producer_port_.InitializeConnection(req, std::move(on_init), shm_fd);
175 
176   // Create the back channel to receive commands from the Service.
177   ipc::Deferred<protos::gen::GetAsyncCommandResponse> on_cmd;
178   on_cmd.Bind(
179       [this](ipc::AsyncResult<protos::gen::GetAsyncCommandResponse> resp) {
180         if (!resp)
181           return;  // The IPC channel was closed and |resp| was auto-rejected.
182         OnServiceRequest(*resp);
183       });
184   producer_port_.GetAsyncCommand(protos::gen::GetAsyncCommandRequest(),
185                                  std::move(on_cmd));
186 
187   // If there are pending Sync() requests, send them now.
188   for (const auto& pending_sync : pending_sync_reqs_)
189     Sync(std::move(pending_sync));
190   pending_sync_reqs_.clear();
191 }
192 
OnDisconnect()193 void ProducerIPCClientImpl::OnDisconnect() {
194   PERFETTO_DCHECK_THREAD(thread_checker_);
195   PERFETTO_DLOG("Tracing service connection failure");
196   connected_ = false;
197   data_sources_setup_.clear();
198   producer_->OnDisconnect();  // Note: may delete |this|.
199 }
200 
OnConnectionInitialized(bool connection_succeeded,bool using_shmem_provided_by_producer,bool direct_smb_patching_supported)201 void ProducerIPCClientImpl::OnConnectionInitialized(
202     bool connection_succeeded,
203     bool using_shmem_provided_by_producer,
204     bool direct_smb_patching_supported) {
205   PERFETTO_DCHECK_THREAD(thread_checker_);
206   // If connection_succeeded == false, the OnDisconnect() call will follow next
207   // and there we'll notify the |producer_|. TODO: add a test for this.
208   if (!connection_succeeded)
209     return;
210   is_shmem_provided_by_producer_ = using_shmem_provided_by_producer;
211   direct_smb_patching_supported_ = direct_smb_patching_supported;
212   producer_->OnConnect();
213 
214   // Bail out if the service failed to adopt our producer-allocated SMB.
215   // TODO(eseckler): Handle adoption failure more gracefully.
216   if (shared_memory_ && !is_shmem_provided_by_producer_) {
217     PERFETTO_DLOG("Service failed adopt producer-provided SMB, disconnecting.");
218     ipc_channel_.reset();
219     return;
220   }
221 }
222 
OnServiceRequest(const protos::gen::GetAsyncCommandResponse & cmd)223 void ProducerIPCClientImpl::OnServiceRequest(
224     const protos::gen::GetAsyncCommandResponse& cmd) {
225   PERFETTO_DCHECK_THREAD(thread_checker_);
226 
227   // This message is sent only when connecting to a service running Android Q+.
228   // See comment below in kStartDataSource.
229   if (cmd.has_setup_data_source()) {
230     const auto& req = cmd.setup_data_source();
231     const DataSourceInstanceID dsid = req.new_instance_id();
232     data_sources_setup_.insert(dsid);
233     producer_->SetupDataSource(dsid, req.config());
234     return;
235   }
236 
237   if (cmd.has_start_data_source()) {
238     const auto& req = cmd.start_data_source();
239     const DataSourceInstanceID dsid = req.new_instance_id();
240     const DataSourceConfig& cfg = req.config();
241     if (!data_sources_setup_.count(dsid)) {
242       // When connecting with an older (Android P) service, the service will not
243       // send a SetupDataSource message. We synthesize it here in that case.
244       producer_->SetupDataSource(dsid, cfg);
245     }
246     producer_->StartDataSource(dsid, cfg);
247     return;
248   }
249 
250   if (cmd.has_stop_data_source()) {
251     const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
252     producer_->StopDataSource(dsid);
253     data_sources_setup_.erase(dsid);
254     return;
255   }
256 
257   if (cmd.has_setup_tracing()) {
258     base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
259     if (shmem_fd) {
260       // This is the nominal case used in most configurations, where the service
261       // provides the SMB.
262       PERFETTO_CHECK(!is_shmem_provided_by_producer_ && !shared_memory_);
263       // TODO(primiano): handle mmap failure in case of OOM.
264       shared_memory_ =
265           PosixSharedMemory::AttachToFd(std::move(shmem_fd),
266                                         /*require_seals_if_supported=*/false);
267       shared_buffer_page_size_kb_ =
268           cmd.setup_tracing().shared_buffer_page_size_kb();
269       shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance(
270           shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, this,
271           task_runner_);
272       if (direct_smb_patching_supported_)
273         shared_memory_arbiter_->SetDirectSMBPatchingSupportedByService();
274     } else {
275       // Producer-provided SMB (used by Chrome for startup tracing).
276       PERFETTO_CHECK(is_shmem_provided_by_producer_ && shared_memory_ &&
277                      shared_memory_arbiter_);
278     }
279     producer_->OnTracingSetup();
280     return;
281   }
282 
283   if (cmd.has_flush()) {
284     // This cast boilerplate is required only because protobuf uses its own
285     // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
286     // type (long vs long long) even though they have the same size.
287     const auto* data_source_ids = cmd.flush().data_source_ids().data();
288     static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
289                   "data_source_ids should be 64-bit");
290     producer_->Flush(
291         cmd.flush().request_id(),
292         reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
293         static_cast<size_t>(cmd.flush().data_source_ids().size()));
294     return;
295   }
296 
297   if (cmd.has_clear_incremental_state()) {
298     const auto* data_source_ids =
299         cmd.clear_incremental_state().data_source_ids().data();
300     static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
301                   "data_source_ids should be 64-bit");
302     producer_->ClearIncrementalState(
303         reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
304         static_cast<size_t>(
305             cmd.clear_incremental_state().data_source_ids().size()));
306     return;
307   }
308 
309   PERFETTO_DFATAL("Unknown async request received from tracing service");
310 }
311 
RegisterDataSource(const DataSourceDescriptor & descriptor)312 void ProducerIPCClientImpl::RegisterDataSource(
313     const DataSourceDescriptor& descriptor) {
314   PERFETTO_DCHECK_THREAD(thread_checker_);
315   if (!connected_) {
316     PERFETTO_DLOG(
317         "Cannot RegisterDataSource(), not connected to tracing service");
318   }
319   protos::gen::RegisterDataSourceRequest req;
320   *req.mutable_data_source_descriptor() = descriptor;
321   ipc::Deferred<protos::gen::RegisterDataSourceResponse> async_response;
322   async_response.Bind(
323       [](ipc::AsyncResult<protos::gen::RegisterDataSourceResponse> response) {
324         if (!response)
325           PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
326       });
327   producer_port_.RegisterDataSource(req, std::move(async_response));
328 }
329 
UnregisterDataSource(const std::string & name)330 void ProducerIPCClientImpl::UnregisterDataSource(const std::string& name) {
331   PERFETTO_DCHECK_THREAD(thread_checker_);
332   if (!connected_) {
333     PERFETTO_DLOG(
334         "Cannot UnregisterDataSource(), not connected to tracing service");
335     return;
336   }
337   protos::gen::UnregisterDataSourceRequest req;
338   req.set_data_source_name(name);
339   producer_port_.UnregisterDataSource(
340       req, ipc::Deferred<protos::gen::UnregisterDataSourceResponse>());
341 }
342 
RegisterTraceWriter(uint32_t writer_id,uint32_t target_buffer)343 void ProducerIPCClientImpl::RegisterTraceWriter(uint32_t writer_id,
344                                                 uint32_t target_buffer) {
345   PERFETTO_DCHECK_THREAD(thread_checker_);
346   if (!connected_) {
347     PERFETTO_DLOG(
348         "Cannot RegisterTraceWriter(), not connected to tracing service");
349     return;
350   }
351   protos::gen::RegisterTraceWriterRequest req;
352   req.set_trace_writer_id(writer_id);
353   req.set_target_buffer(target_buffer);
354   producer_port_.RegisterTraceWriter(
355       req, ipc::Deferred<protos::gen::RegisterTraceWriterResponse>());
356 }
357 
UnregisterTraceWriter(uint32_t writer_id)358 void ProducerIPCClientImpl::UnregisterTraceWriter(uint32_t writer_id) {
359   PERFETTO_DCHECK_THREAD(thread_checker_);
360   if (!connected_) {
361     PERFETTO_DLOG(
362         "Cannot UnregisterTraceWriter(), not connected to tracing service");
363     return;
364   }
365   protos::gen::UnregisterTraceWriterRequest req;
366   req.set_trace_writer_id(writer_id);
367   producer_port_.UnregisterTraceWriter(
368       req, ipc::Deferred<protos::gen::UnregisterTraceWriterResponse>());
369 }
370 
CommitData(const CommitDataRequest & req,CommitDataCallback callback)371 void ProducerIPCClientImpl::CommitData(const CommitDataRequest& req,
372                                        CommitDataCallback callback) {
373   PERFETTO_DCHECK_THREAD(thread_checker_);
374   if (!connected_) {
375     PERFETTO_DLOG("Cannot CommitData(), not connected to tracing service");
376     return;
377   }
378   ipc::Deferred<protos::gen::CommitDataResponse> async_response;
379   // TODO(primiano): add a test that destroys ProducerIPCClientImpl soon after
380   // this call and checks that the callback is dropped.
381   if (callback) {
382     async_response.Bind(
383         [callback](ipc::AsyncResult<protos::gen::CommitDataResponse> response) {
384           if (!response) {
385             PERFETTO_DLOG("CommitData() failed: connection reset");
386             return;
387           }
388           callback();
389         });
390   }
391   producer_port_.CommitData(req, std::move(async_response));
392 }
393 
NotifyDataSourceStarted(DataSourceInstanceID id)394 void ProducerIPCClientImpl::NotifyDataSourceStarted(DataSourceInstanceID id) {
395   PERFETTO_DCHECK_THREAD(thread_checker_);
396   if (!connected_) {
397     PERFETTO_DLOG(
398         "Cannot NotifyDataSourceStarted(), not connected to tracing service");
399     return;
400   }
401   protos::gen::NotifyDataSourceStartedRequest req;
402   req.set_data_source_id(id);
403   producer_port_.NotifyDataSourceStarted(
404       req, ipc::Deferred<protos::gen::NotifyDataSourceStartedResponse>());
405 }
406 
NotifyDataSourceStopped(DataSourceInstanceID id)407 void ProducerIPCClientImpl::NotifyDataSourceStopped(DataSourceInstanceID id) {
408   PERFETTO_DCHECK_THREAD(thread_checker_);
409   if (!connected_) {
410     PERFETTO_DLOG(
411         "Cannot NotifyDataSourceStopped(), not connected to tracing service");
412     return;
413   }
414   protos::gen::NotifyDataSourceStoppedRequest req;
415   req.set_data_source_id(id);
416   producer_port_.NotifyDataSourceStopped(
417       req, ipc::Deferred<protos::gen::NotifyDataSourceStoppedResponse>());
418 }
419 
ActivateTriggers(const std::vector<std::string> & triggers)420 void ProducerIPCClientImpl::ActivateTriggers(
421     const std::vector<std::string>& triggers) {
422   PERFETTO_DCHECK_THREAD(thread_checker_);
423   if (!connected_) {
424     PERFETTO_DLOG(
425         "Cannot ActivateTriggers(), not connected to tracing service");
426     return;
427   }
428   protos::gen::ActivateTriggersRequest proto_req;
429   for (const auto& name : triggers) {
430     *proto_req.add_trigger_names() = name;
431   }
432   producer_port_.ActivateTriggers(
433       proto_req, ipc::Deferred<protos::gen::ActivateTriggersResponse>());
434 }
435 
Sync(std::function<void ()> callback)436 void ProducerIPCClientImpl::Sync(std::function<void()> callback) {
437   PERFETTO_DCHECK_THREAD(thread_checker_);
438   if (!connected_) {
439     pending_sync_reqs_.emplace_back(std::move(callback));
440     return;
441   }
442   ipc::Deferred<protos::gen::SyncResponse> resp;
443   resp.Bind([callback](ipc::AsyncResult<protos::gen::SyncResponse>) {
444     // Here we ACK the callback even if the service replies with a failure
445     // (i.e. the service is too old and doesn't understand Sync()). In that
446     // case the service has still seen the request, the IPC roundtrip is
447     // still a (weaker) linearization fence.
448     callback();
449   });
450   producer_port_.Sync(protos::gen::SyncRequest(), std::move(resp));
451 }
452 
CreateTraceWriter(BufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)453 std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter(
454     BufferID target_buffer,
455     BufferExhaustedPolicy buffer_exhausted_policy) {
456   // This method can be called by different threads. |shared_memory_arbiter_| is
457   // thread-safe but be aware of accessing any other state in this function.
458   return shared_memory_arbiter_->CreateTraceWriter(target_buffer,
459                                                    buffer_exhausted_policy);
460 }
461 
MaybeSharedMemoryArbiter()462 SharedMemoryArbiter* ProducerIPCClientImpl::MaybeSharedMemoryArbiter() {
463   return shared_memory_arbiter_.get();
464 }
465 
IsShmemProvidedByProducer() const466 bool ProducerIPCClientImpl::IsShmemProvidedByProducer() const {
467   return is_shmem_provided_by_producer_;
468 }
469 
NotifyFlushComplete(FlushRequestID req_id)470 void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
471   return shared_memory_arbiter_->NotifyFlushComplete(req_id);
472 }
473 
shared_memory() const474 SharedMemory* ProducerIPCClientImpl::shared_memory() const {
475   return shared_memory_.get();
476 }
477 
shared_buffer_page_size_kb() const478 size_t ProducerIPCClientImpl::shared_buffer_page_size_kb() const {
479   return shared_buffer_page_size_kb_;
480 }
481 
482 }  // namespace perfetto
483