1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/consumer/consumer_ipc_client_impl.h"
18 
19 #include <inttypes.h>
20 #include <string.h>
21 
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ext/ipc/client.h"
24 #include "perfetto/ext/tracing/core/consumer.h"
25 #include "perfetto/ext/tracing/core/observable_events.h"
26 #include "perfetto/ext/tracing/core/trace_stats.h"
27 #include "perfetto/tracing/core/trace_config.h"
28 #include "perfetto/tracing/core/tracing_service_state.h"
29 
30 // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
31 // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
32 // Consumer* during the callbacks.
33 
34 namespace perfetto {
35 
36 // static. (Declared in include/tracing/ipc/consumer_ipc_client.h).
Connect(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)37 std::unique_ptr<TracingService::ConsumerEndpoint> ConsumerIPCClient::Connect(
38     const char* service_sock_name,
39     Consumer* consumer,
40     base::TaskRunner* task_runner) {
41   return std::unique_ptr<TracingService::ConsumerEndpoint>(
42       new ConsumerIPCClientImpl(service_sock_name, consumer, task_runner));
43 }
44 
ConsumerIPCClientImpl(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)45 ConsumerIPCClientImpl::ConsumerIPCClientImpl(const char* service_sock_name,
46                                              Consumer* consumer,
47                                              base::TaskRunner* task_runner)
48     : consumer_(consumer),
49       ipc_channel_(
50           ipc::Client::CreateInstance({service_sock_name, /*sock_retry=*/false},
51                                       task_runner)),
52       consumer_port_(this /* event_listener */),
53       weak_ptr_factory_(this) {
54   ipc_channel_->BindService(consumer_port_.GetWeakPtr());
55 }
56 
57 ConsumerIPCClientImpl::~ConsumerIPCClientImpl() = default;
58 
59 // Called by the IPC layer if the BindService() succeeds.
OnConnect()60 void ConsumerIPCClientImpl::OnConnect() {
61   connected_ = true;
62   consumer_->OnConnect();
63 }
64 
OnDisconnect()65 void ConsumerIPCClientImpl::OnDisconnect() {
66   PERFETTO_DLOG("Tracing service connection failure");
67   connected_ = false;
68   consumer_->OnDisconnect();  // Note: may delete |this|.
69 }
70 
EnableTracing(const TraceConfig & trace_config,base::ScopedFile fd)71 void ConsumerIPCClientImpl::EnableTracing(const TraceConfig& trace_config,
72                                           base::ScopedFile fd) {
73   if (!connected_) {
74     PERFETTO_DLOG("Cannot EnableTracing(), not connected to tracing service");
75     return;
76   }
77 
78 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
79   if (fd) {
80     consumer_->OnTracingDisabled(
81         "Passing FDs for write_into_file is not supported on Windows");
82     return;
83   }
84 #endif
85 
86   protos::gen::EnableTracingRequest req;
87   *req.mutable_trace_config() = trace_config;
88   ipc::Deferred<protos::gen::EnableTracingResponse> async_response;
89   auto weak_this = weak_ptr_factory_.GetWeakPtr();
90   async_response.Bind(
91       [weak_this](
92           ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
93         if (weak_this)
94           weak_this->OnEnableTracingResponse(std::move(response));
95       });
96 
97   // |fd| will be closed when this function returns, but it's fine because the
98   // IPC layer dup()'s it when sending the IPC.
99   consumer_port_.EnableTracing(req, std::move(async_response), *fd);
100 }
101 
ChangeTraceConfig(const TraceConfig & trace_config)102 void ConsumerIPCClientImpl::ChangeTraceConfig(const TraceConfig& trace_config) {
103   if (!connected_) {
104     PERFETTO_DLOG(
105         "Cannot ChangeTraceConfig(), not connected to tracing service");
106     return;
107   }
108 
109   ipc::Deferred<protos::gen::ChangeTraceConfigResponse> async_response;
110   async_response.Bind(
111       [](ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse> response) {
112         if (!response)
113           PERFETTO_DLOG("ChangeTraceConfig() failed");
114       });
115   protos::gen::ChangeTraceConfigRequest req;
116   *req.mutable_trace_config() = trace_config;
117   consumer_port_.ChangeTraceConfig(req, std::move(async_response));
118 }
119 
StartTracing()120 void ConsumerIPCClientImpl::StartTracing() {
121   if (!connected_) {
122     PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
123     return;
124   }
125 
126   ipc::Deferred<protos::gen::StartTracingResponse> async_response;
127   async_response.Bind(
128       [](ipc::AsyncResult<protos::gen::StartTracingResponse> response) {
129         if (!response)
130           PERFETTO_DLOG("StartTracing() failed");
131       });
132   protos::gen::StartTracingRequest req;
133   consumer_port_.StartTracing(req, std::move(async_response));
134 }
135 
DisableTracing()136 void ConsumerIPCClientImpl::DisableTracing() {
137   if (!connected_) {
138     PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
139     return;
140   }
141 
142   ipc::Deferred<protos::gen::DisableTracingResponse> async_response;
143   async_response.Bind(
144       [](ipc::AsyncResult<protos::gen::DisableTracingResponse> response) {
145         if (!response)
146           PERFETTO_DLOG("DisableTracing() failed");
147       });
148   consumer_port_.DisableTracing(protos::gen::DisableTracingRequest(),
149                                 std::move(async_response));
150 }
151 
ReadBuffers()152 void ConsumerIPCClientImpl::ReadBuffers() {
153   if (!connected_) {
154     PERFETTO_DLOG("Cannot ReadBuffers(), not connected to tracing service");
155     return;
156   }
157 
158   ipc::Deferred<protos::gen::ReadBuffersResponse> async_response;
159 
160   // The IPC layer guarantees that callbacks are destroyed after this object
161   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
162   // contract of this class expects the caller to not destroy the Consumer class
163   // before having destroyed this class. Hence binding |this| here is safe.
164   async_response.Bind(
165       [this](ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
166         OnReadBuffersResponse(std::move(response));
167       });
168   consumer_port_.ReadBuffers(protos::gen::ReadBuffersRequest(),
169                              std::move(async_response));
170 }
171 
OnReadBuffersResponse(ipc::AsyncResult<protos::gen::ReadBuffersResponse> response)172 void ConsumerIPCClientImpl::OnReadBuffersResponse(
173     ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
174   if (!response) {
175     PERFETTO_DLOG("ReadBuffers() failed");
176     return;
177   }
178   std::vector<TracePacket> trace_packets;
179   for (auto& resp_slice : response->slices()) {
180     const std::string& slice_data = resp_slice.data();
181     Slice slice = Slice::Allocate(slice_data.size());
182     memcpy(slice.own_data(), slice_data.data(), slice.size);
183     partial_packet_.AddSlice(std::move(slice));
184     if (resp_slice.last_slice_for_packet())
185       trace_packets.emplace_back(std::move(partial_packet_));
186   }
187   if (!trace_packets.empty() || !response.has_more())
188     consumer_->OnTraceData(std::move(trace_packets), response.has_more());
189 }
190 
OnEnableTracingResponse(ipc::AsyncResult<protos::gen::EnableTracingResponse> response)191 void ConsumerIPCClientImpl::OnEnableTracingResponse(
192     ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
193   std::string error;
194   // |response| might be empty when the request gets rejected (if the connection
195   // with the service is dropped all outstanding requests are auto-rejected).
196   if (!response) {
197     error =
198         "EnableTracing IPC request rejected. This is likely due to a loss of "
199         "the traced connection";
200   } else {
201     error = response->error();
202   }
203   if (!response || response->disabled())
204     consumer_->OnTracingDisabled(error);
205 }
206 
FreeBuffers()207 void ConsumerIPCClientImpl::FreeBuffers() {
208   if (!connected_) {
209     PERFETTO_DLOG("Cannot FreeBuffers(), not connected to tracing service");
210     return;
211   }
212 
213   protos::gen::FreeBuffersRequest req;
214   ipc::Deferred<protos::gen::FreeBuffersResponse> async_response;
215   async_response.Bind(
216       [](ipc::AsyncResult<protos::gen::FreeBuffersResponse> response) {
217         if (!response)
218           PERFETTO_DLOG("FreeBuffers() failed");
219       });
220   consumer_port_.FreeBuffers(req, std::move(async_response));
221 }
222 
Flush(uint32_t timeout_ms,FlushCallback callback)223 void ConsumerIPCClientImpl::Flush(uint32_t timeout_ms, FlushCallback callback) {
224   if (!connected_) {
225     PERFETTO_DLOG("Cannot Flush(), not connected to tracing service");
226     return callback(/*success=*/false);
227   }
228 
229   protos::gen::FlushRequest req;
230   req.set_timeout_ms(static_cast<uint32_t>(timeout_ms));
231   ipc::Deferred<protos::gen::FlushResponse> async_response;
232   async_response.Bind(
233       [callback](ipc::AsyncResult<protos::gen::FlushResponse> response) {
234         callback(!!response);
235       });
236   consumer_port_.Flush(req, std::move(async_response));
237 }
238 
Detach(const std::string & key)239 void ConsumerIPCClientImpl::Detach(const std::string& key) {
240   if (!connected_) {
241     PERFETTO_DLOG("Cannot Detach(), not connected to tracing service");
242     return;
243   }
244 
245   protos::gen::DetachRequest req;
246   req.set_key(key);
247   ipc::Deferred<protos::gen::DetachResponse> async_response;
248   auto weak_this = weak_ptr_factory_.GetWeakPtr();
249 
250   async_response.Bind(
251       [weak_this](ipc::AsyncResult<protos::gen::DetachResponse> response) {
252         if (weak_this)
253           weak_this->consumer_->OnDetach(!!response);
254       });
255   consumer_port_.Detach(req, std::move(async_response));
256 }
257 
Attach(const std::string & key)258 void ConsumerIPCClientImpl::Attach(const std::string& key) {
259   if (!connected_) {
260     PERFETTO_DLOG("Cannot Attach(), not connected to tracing service");
261     return;
262   }
263 
264   {
265     protos::gen::AttachRequest req;
266     req.set_key(key);
267     ipc::Deferred<protos::gen::AttachResponse> async_response;
268     auto weak_this = weak_ptr_factory_.GetWeakPtr();
269 
270     async_response.Bind(
271         [weak_this](ipc::AsyncResult<protos::gen::AttachResponse> response) {
272           if (!weak_this)
273             return;
274           if (!response) {
275             weak_this->consumer_->OnAttach(/*success=*/false, TraceConfig());
276             return;
277           }
278           const TraceConfig& trace_config = response->trace_config();
279 
280           // If attached succesfully, also attach to the end-of-trace
281           // notificaton callback, via EnableTracing(attach_notification_only).
282           protos::gen::EnableTracingRequest enable_req;
283           enable_req.set_attach_notification_only(true);
284           ipc::Deferred<protos::gen::EnableTracingResponse> enable_resp;
285           enable_resp.Bind(
286               [weak_this](
287                   ipc::AsyncResult<protos::gen::EnableTracingResponse> resp) {
288                 if (weak_this)
289                   weak_this->OnEnableTracingResponse(std::move(resp));
290               });
291           weak_this->consumer_port_.EnableTracing(enable_req,
292                                                   std::move(enable_resp));
293 
294           weak_this->consumer_->OnAttach(/*success=*/true, trace_config);
295         });
296     consumer_port_.Attach(req, std::move(async_response));
297   }
298 }
299 
GetTraceStats()300 void ConsumerIPCClientImpl::GetTraceStats() {
301   if (!connected_) {
302     PERFETTO_DLOG("Cannot GetTraceStats(), not connected to tracing service");
303     return;
304   }
305 
306   protos::gen::GetTraceStatsRequest req;
307   ipc::Deferred<protos::gen::GetTraceStatsResponse> async_response;
308 
309   // The IPC layer guarantees that callbacks are destroyed after this object
310   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
311   // contract of this class expects the caller to not destroy the Consumer class
312   // before having destroyed this class. Hence binding |this| here is safe.
313   async_response.Bind(
314       [this](ipc::AsyncResult<protos::gen::GetTraceStatsResponse> response) {
315         if (!response) {
316           consumer_->OnTraceStats(/*success=*/false, TraceStats());
317           return;
318         }
319         consumer_->OnTraceStats(/*success=*/true, response->trace_stats());
320       });
321   consumer_port_.GetTraceStats(req, std::move(async_response));
322 }
323 
ObserveEvents(uint32_t enabled_event_types)324 void ConsumerIPCClientImpl::ObserveEvents(uint32_t enabled_event_types) {
325   if (!connected_) {
326     PERFETTO_DLOG("Cannot ObserveEvents(), not connected to tracing service");
327     return;
328   }
329 
330   protos::gen::ObserveEventsRequest req;
331   for (uint32_t i = 0; i < 32; i++) {
332     const uint32_t event_id = 1u << i;
333     if (enabled_event_types & event_id)
334       req.add_events_to_observe(static_cast<ObservableEvents::Type>(event_id));
335   }
336 
337   ipc::Deferred<protos::gen::ObserveEventsResponse> async_response;
338   // The IPC layer guarantees that callbacks are destroyed after this object
339   // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
340   // contract of this class expects the caller to not destroy the Consumer class
341   // before having destroyed this class. Hence binding |this| here is safe.
342   async_response.Bind(
343       [this](ipc::AsyncResult<protos::gen::ObserveEventsResponse> response) {
344         // Skip empty response, which the service sends to close the stream.
345         if (!response.has_more()) {
346           PERFETTO_DCHECK(!response.success());
347           return;
348         }
349         consumer_->OnObservableEvents(response->events());
350       });
351   consumer_port_.ObserveEvents(req, std::move(async_response));
352 }
353 
QueryServiceState(QueryServiceStateCallback callback)354 void ConsumerIPCClientImpl::QueryServiceState(
355     QueryServiceStateCallback callback) {
356   if (!connected_) {
357     PERFETTO_DLOG(
358         "Cannot QueryServiceState(), not connected to tracing service");
359     return;
360   }
361 
362   auto it = pending_query_svc_reqs_.insert(pending_query_svc_reqs_.end(),
363                                            {std::move(callback), {}});
364   protos::gen::QueryServiceStateRequest req;
365   ipc::Deferred<protos::gen::QueryServiceStateResponse> async_response;
366   auto weak_this = weak_ptr_factory_.GetWeakPtr();
367   async_response.Bind(
368       [weak_this,
369        it](ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response) {
370         if (weak_this)
371           weak_this->OnQueryServiceStateResponse(std::move(response), it);
372       });
373   consumer_port_.QueryServiceState(req, std::move(async_response));
374 }
375 
OnQueryServiceStateResponse(ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,PendingQueryServiceRequests::iterator req_it)376 void ConsumerIPCClientImpl::OnQueryServiceStateResponse(
377     ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,
378     PendingQueryServiceRequests::iterator req_it) {
379   PERFETTO_DCHECK(req_it->callback);
380 
381   if (!response) {
382     auto callback = std::move(req_it->callback);
383     pending_query_svc_reqs_.erase(req_it);
384     callback(false, TracingServiceState());
385     return;
386   }
387 
388   // The QueryServiceState response can be split in several chunks if the
389   // service has several data sources. The client is supposed to merge all the
390   // replies. The easiest way to achieve this is to re-serialize the partial
391   // response and then re-decode the merged result in one shot.
392   std::vector<uint8_t>& merged_resp = req_it->merged_resp;
393   std::vector<uint8_t> part = response->service_state().SerializeAsArray();
394   merged_resp.insert(merged_resp.end(), part.begin(), part.end());
395 
396   if (response.has_more())
397     return;
398 
399   // All replies have been received. Decode the merged result and reply to the
400   // callback.
401   protos::gen::TracingServiceState svc_state;
402   bool ok = svc_state.ParseFromArray(merged_resp.data(), merged_resp.size());
403   if (!ok)
404     PERFETTO_ELOG("Failed to decode merged QueryServiceStateResponse");
405   auto callback = std::move(req_it->callback);
406   pending_query_svc_reqs_.erase(req_it);
407   callback(ok, std::move(svc_state));
408 }
409 
QueryCapabilities(QueryCapabilitiesCallback callback)410 void ConsumerIPCClientImpl::QueryCapabilities(
411     QueryCapabilitiesCallback callback) {
412   if (!connected_) {
413     PERFETTO_DLOG(
414         "Cannot QueryCapabilities(), not connected to tracing service");
415     return;
416   }
417 
418   protos::gen::QueryCapabilitiesRequest req;
419   ipc::Deferred<protos::gen::QueryCapabilitiesResponse> async_response;
420   async_response.Bind(
421       [callback](
422           ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse> response) {
423         if (!response) {
424           // If the IPC fails, we are talking to an older version of the service
425           // that didn't support QueryCapabilities at all. In this case return
426           // an empty capabilities message.
427           callback(TracingServiceCapabilities());
428         } else {
429           callback(response->capabilities());
430         }
431       });
432   consumer_port_.QueryCapabilities(req, std::move(async_response));
433 }
434 
SaveTraceForBugreport(SaveTraceForBugreportCallback callback)435 void ConsumerIPCClientImpl::SaveTraceForBugreport(
436     SaveTraceForBugreportCallback callback) {
437   if (!connected_) {
438     PERFETTO_DLOG(
439         "Cannot SaveTraceForBugreport(), not connected to tracing service");
440     return;
441   }
442 
443   protos::gen::SaveTraceForBugreportRequest req;
444   ipc::Deferred<protos::gen::SaveTraceForBugreportResponse> async_response;
445   async_response.Bind(
446       [callback](ipc::AsyncResult<protos::gen::SaveTraceForBugreportResponse>
447                      response) {
448         if (!response) {
449           // If the IPC fails, we are talking to an older version of the service
450           // that didn't support SaveTraceForBugreport at all.
451           callback(
452               false,
453               "The tracing service doesn't support SaveTraceForBugreport()");
454         } else {
455           callback(response->success(), response->msg());
456         }
457       });
458   consumer_port_.SaveTraceForBugreport(req, std::move(async_response));
459 }
460 
461 }  // namespace perfetto
462