1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/service/consumer_ipc_service.h"
18 
19 #include <inttypes.h>
20 
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ext/base/scoped_file.h"
24 #include "perfetto/ext/ipc/basic_types.h"
25 #include "perfetto/ext/ipc/host.h"
26 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
27 #include "perfetto/ext/tracing/core/slice.h"
28 #include "perfetto/ext/tracing/core/trace_packet.h"
29 #include "perfetto/ext/tracing/core/trace_stats.h"
30 #include "perfetto/ext/tracing/core/tracing_service.h"
31 #include "perfetto/tracing/core/trace_config.h"
32 #include "perfetto/tracing/core/tracing_service_capabilities.h"
33 #include "perfetto/tracing/core/tracing_service_state.h"
34 
35 namespace perfetto {
36 
ConsumerIPCService(TracingService * core_service)37 ConsumerIPCService::ConsumerIPCService(TracingService* core_service)
38     : core_service_(core_service), weak_ptr_factory_(this) {}
39 
40 ConsumerIPCService::~ConsumerIPCService() = default;
41 
42 ConsumerIPCService::RemoteConsumer*
GetConsumerForCurrentRequest()43 ConsumerIPCService::GetConsumerForCurrentRequest() {
44   const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
45   const uid_t uid = ipc::Service::client_info().uid();
46   PERFETTO_CHECK(ipc_client_id);
47   auto it = consumers_.find(ipc_client_id);
48   if (it == consumers_.end()) {
49     auto* remote_consumer = new RemoteConsumer();
50     consumers_[ipc_client_id].reset(remote_consumer);
51     remote_consumer->service_endpoint =
52         core_service_->ConnectConsumer(remote_consumer, uid);
53     return remote_consumer;
54   }
55   return it->second.get();
56 }
57 
58 // Called by the IPC layer.
OnClientDisconnected()59 void ConsumerIPCService::OnClientDisconnected() {
60   ipc::ClientID client_id = ipc::Service::client_info().client_id();
61   consumers_.erase(client_id);
62 }
63 
64 // Called by the IPC layer.
EnableTracing(const protos::gen::EnableTracingRequest & req,DeferredEnableTracingResponse resp)65 void ConsumerIPCService::EnableTracing(
66     const protos::gen::EnableTracingRequest& req,
67     DeferredEnableTracingResponse resp) {
68   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
69   if (req.attach_notification_only()) {
70     remote_consumer->enable_tracing_response = std::move(resp);
71     return;
72   }
73   const TraceConfig& trace_config = req.trace_config();
74   base::ScopedFile fd;
75   if (trace_config.write_into_file() && trace_config.output_path().empty())
76     fd = ipc::Service::TakeReceivedFD();
77   remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd));
78   remote_consumer->enable_tracing_response = std::move(resp);
79 }
80 
81 // Called by the IPC layer.
StartTracing(const protos::gen::StartTracingRequest &,DeferredStartTracingResponse resp)82 void ConsumerIPCService::StartTracing(const protos::gen::StartTracingRequest&,
83                                       DeferredStartTracingResponse resp) {
84   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
85   remote_consumer->service_endpoint->StartTracing();
86   resp.Resolve(ipc::AsyncResult<protos::gen::StartTracingResponse>::Create());
87 }
88 
89 // Called by the IPC layer.
ChangeTraceConfig(const protos::gen::ChangeTraceConfigRequest & req,DeferredChangeTraceConfigResponse resp)90 void ConsumerIPCService::ChangeTraceConfig(
91     const protos::gen::ChangeTraceConfigRequest& req,
92     DeferredChangeTraceConfigResponse resp) {
93   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
94   remote_consumer->service_endpoint->ChangeTraceConfig(req.trace_config());
95   resp.Resolve(
96       ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse>::Create());
97 }
98 
99 // Called by the IPC layer.
DisableTracing(const protos::gen::DisableTracingRequest &,DeferredDisableTracingResponse resp)100 void ConsumerIPCService::DisableTracing(
101     const protos::gen::DisableTracingRequest&,
102     DeferredDisableTracingResponse resp) {
103   GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
104   resp.Resolve(ipc::AsyncResult<protos::gen::DisableTracingResponse>::Create());
105 }
106 
107 // Called by the IPC layer.
ReadBuffers(const protos::gen::ReadBuffersRequest &,DeferredReadBuffersResponse resp)108 void ConsumerIPCService::ReadBuffers(const protos::gen::ReadBuffersRequest&,
109                                      DeferredReadBuffersResponse resp) {
110   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
111   remote_consumer->read_buffers_response = std::move(resp);
112   remote_consumer->service_endpoint->ReadBuffers();
113 }
114 
115 // Called by the IPC layer.
FreeBuffers(const protos::gen::FreeBuffersRequest &,DeferredFreeBuffersResponse resp)116 void ConsumerIPCService::FreeBuffers(const protos::gen::FreeBuffersRequest&,
117                                      DeferredFreeBuffersResponse resp) {
118   GetConsumerForCurrentRequest()->service_endpoint->FreeBuffers();
119   resp.Resolve(ipc::AsyncResult<protos::gen::FreeBuffersResponse>::Create());
120 }
121 
122 // Called by the IPC layer.
Flush(const protos::gen::FlushRequest & req,DeferredFlushResponse resp)123 void ConsumerIPCService::Flush(const protos::gen::FlushRequest& req,
124                                DeferredFlushResponse resp) {
125   auto it = pending_flush_responses_.insert(pending_flush_responses_.end(),
126                                             std::move(resp));
127   auto weak_this = weak_ptr_factory_.GetWeakPtr();
128   auto callback = [weak_this, it](bool success) {
129     if (weak_this)
130       weak_this->OnFlushCallback(success, std::move(it));
131   };
132   GetConsumerForCurrentRequest()->service_endpoint->Flush(req.timeout_ms(),
133                                                           std::move(callback));
134 }
135 
136 // Called by the IPC layer.
Detach(const protos::gen::DetachRequest & req,DeferredDetachResponse resp)137 void ConsumerIPCService::Detach(const protos::gen::DetachRequest& req,
138                                 DeferredDetachResponse resp) {
139   // OnDetach() will resolve the |detach_response|.
140   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
141   remote_consumer->detach_response = std::move(resp);
142   remote_consumer->service_endpoint->Detach(req.key());
143 }
144 
145 // Called by the IPC layer.
Attach(const protos::gen::AttachRequest & req,DeferredAttachResponse resp)146 void ConsumerIPCService::Attach(const protos::gen::AttachRequest& req,
147                                 DeferredAttachResponse resp) {
148   // OnAttach() will resolve the |attach_response|.
149   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
150   remote_consumer->attach_response = std::move(resp);
151   remote_consumer->service_endpoint->Attach(req.key());
152 }
153 
154 // Called by the IPC layer.
GetTraceStats(const protos::gen::GetTraceStatsRequest &,DeferredGetTraceStatsResponse resp)155 void ConsumerIPCService::GetTraceStats(const protos::gen::GetTraceStatsRequest&,
156                                        DeferredGetTraceStatsResponse resp) {
157   // OnTraceStats() will resolve the |get_trace_stats_response|.
158   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
159   remote_consumer->get_trace_stats_response = std::move(resp);
160   remote_consumer->service_endpoint->GetTraceStats();
161 }
162 
163 // Called by the IPC layer.
ObserveEvents(const protos::gen::ObserveEventsRequest & req,DeferredObserveEventsResponse resp)164 void ConsumerIPCService::ObserveEvents(
165     const protos::gen::ObserveEventsRequest& req,
166     DeferredObserveEventsResponse resp) {
167   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
168 
169   // If there's a prior stream, close it so that client can clean it up.
170   remote_consumer->CloseObserveEventsResponseStream();
171 
172   remote_consumer->observe_events_response = std::move(resp);
173 
174   uint32_t events_mask = 0;
175   for (const auto& type : req.events_to_observe()) {
176     events_mask |= static_cast<uint32_t>(type);
177   }
178   remote_consumer->service_endpoint->ObserveEvents(events_mask);
179 
180   // If no events are to be observed, close the stream immediately so that the
181   // client can clean up.
182   if (events_mask == 0)
183     remote_consumer->CloseObserveEventsResponseStream();
184 }
185 
186 // Called by the IPC layer.
QueryServiceState(const protos::gen::QueryServiceStateRequest &,DeferredQueryServiceStateResponse resp)187 void ConsumerIPCService::QueryServiceState(
188     const protos::gen::QueryServiceStateRequest&,
189     DeferredQueryServiceStateResponse resp) {
190   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
191   auto it = pending_query_service_responses_.insert(
192       pending_query_service_responses_.end(), std::move(resp));
193   auto weak_this = weak_ptr_factory_.GetWeakPtr();
194   auto callback = [weak_this, it](bool success,
195                                   const TracingServiceState& svc_state) {
196     if (weak_this)
197       weak_this->OnQueryServiceCallback(success, svc_state, std::move(it));
198   };
199   remote_consumer->service_endpoint->QueryServiceState(callback);
200 }
201 
202 // Called by the service in response to service_endpoint->QueryServiceState().
OnQueryServiceCallback(bool success,const TracingServiceState & svc_state,PendingQuerySvcResponses::iterator pending_response_it)203 void ConsumerIPCService::OnQueryServiceCallback(
204     bool success,
205     const TracingServiceState& svc_state,
206     PendingQuerySvcResponses::iterator pending_response_it) {
207   DeferredQueryServiceStateResponse response(std::move(*pending_response_it));
208   pending_query_service_responses_.erase(pending_response_it);
209   if (!success) {
210     response.Reject();
211     return;
212   }
213 
214   // The TracingServiceState object might be too big to fit into a single IPC
215   // message because it contains the DataSourceDescriptor of each data source.
216   // Here we split it in chunks to fit in the IPC limit, observing the
217   // following rule: each chunk must be invididually a valid TracingServiceState
218   // message; all the chunks concatenated together must form the original
219   // message. This is to deal with the legacy API that was just sending one
220   // whole message (failing in presence of too many data sources, b/153142114).
221   // The message is split as follows: we take the whole TracingServiceState,
222   // take out the data sources section (which is a top-level repeated field)
223   // and re-add them one-by-one. If, in the process of appending, the IPC msg
224   // size is reached, a new chunk is created. This assumes that the rest of
225   // TracingServiceState fits in one IPC message and each DataSourceDescriptor
226   // fits in the worst case in a dedicated message (which is true, because
227   // otherwise the RegisterDataSource() which passes the descriptor in the first
228   // place would fail).
229 
230   std::vector<uint8_t> chunked_reply;
231 
232   // Transmits the current chunk and starts a new one.
233   bool sent_eof = false;
234   auto send_chunked_reply = [&chunked_reply, &response,
235                              &sent_eof](bool has_more) {
236     PERFETTO_CHECK(!sent_eof);
237     sent_eof = !has_more;
238     auto resp =
239         ipc::AsyncResult<protos::gen::QueryServiceStateResponse>::Create();
240     resp.set_has_more(has_more);
241     PERFETTO_CHECK(resp->mutable_service_state()->ParseFromArray(
242         chunked_reply.data(), chunked_reply.size()));
243     chunked_reply.clear();
244     response.Resolve(std::move(resp));
245   };
246 
247   // Create a copy of the whole response and cut away the data_sources section.
248   protos::gen::TracingServiceState svc_state_copy = svc_state;
249   auto data_sources = std::move(*svc_state_copy.mutable_data_sources());
250   chunked_reply = svc_state_copy.SerializeAsArray();
251 
252   // Now re-add them fitting within the IPC message limits (- some margin for
253   // the outer IPC frame).
254   constexpr size_t kMaxMsgSize = ipc::kIPCBufferSize - 128;
255   for (const auto& data_source : data_sources) {
256     protos::gen::TracingServiceState tmp;
257     tmp.mutable_data_sources()->emplace_back(std::move(data_source));
258     std::vector<uint8_t> chunk = tmp.SerializeAsArray();
259     if (chunked_reply.size() + chunk.size() < kMaxMsgSize) {
260       chunked_reply.insert(chunked_reply.end(), chunk.begin(), chunk.end());
261     } else {
262       send_chunked_reply(/*has_more=*/true);
263       chunked_reply = std::move(chunk);
264     }
265   }
266 
267   PERFETTO_DCHECK(!chunked_reply.empty());
268   send_chunked_reply(/*has_more=*/false);
269   PERFETTO_CHECK(sent_eof);
270 }
271 
272 // Called by the service in response to a service_endpoint->Flush() request.
OnFlushCallback(bool success,PendingFlushResponses::iterator pending_response_it)273 void ConsumerIPCService::OnFlushCallback(
274     bool success,
275     PendingFlushResponses::iterator pending_response_it) {
276   DeferredFlushResponse response(std::move(*pending_response_it));
277   pending_flush_responses_.erase(pending_response_it);
278   if (success) {
279     response.Resolve(ipc::AsyncResult<protos::gen::FlushResponse>::Create());
280   } else {
281     response.Reject();
282   }
283 }
284 
QueryCapabilities(const protos::gen::QueryCapabilitiesRequest &,DeferredQueryCapabilitiesResponse resp)285 void ConsumerIPCService::QueryCapabilities(
286     const protos::gen::QueryCapabilitiesRequest&,
287     DeferredQueryCapabilitiesResponse resp) {
288   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
289   auto it = pending_query_capabilities_responses_.insert(
290       pending_query_capabilities_responses_.end(), std::move(resp));
291   auto weak_this = weak_ptr_factory_.GetWeakPtr();
292   auto callback = [weak_this, it](const TracingServiceCapabilities& caps) {
293     if (weak_this)
294       weak_this->OnQueryCapabilitiesCallback(caps, std::move(it));
295   };
296   remote_consumer->service_endpoint->QueryCapabilities(callback);
297 }
298 
299 // Called by the service in response to service_endpoint->QueryCapabilities().
OnQueryCapabilitiesCallback(const TracingServiceCapabilities & caps,PendingQueryCapabilitiesResponses::iterator pending_response_it)300 void ConsumerIPCService::OnQueryCapabilitiesCallback(
301     const TracingServiceCapabilities& caps,
302     PendingQueryCapabilitiesResponses::iterator pending_response_it) {
303   DeferredQueryCapabilitiesResponse response(std::move(*pending_response_it));
304   pending_query_capabilities_responses_.erase(pending_response_it);
305   auto resp =
306       ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse>::Create();
307   *resp->mutable_capabilities() = caps;
308   response.Resolve(std::move(resp));
309 }
310 
SaveTraceForBugreport(const protos::gen::SaveTraceForBugreportRequest &,DeferredSaveTraceForBugreportResponse resp)311 void ConsumerIPCService::SaveTraceForBugreport(
312     const protos::gen::SaveTraceForBugreportRequest&,
313     DeferredSaveTraceForBugreportResponse resp) {
314   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
315   auto it = pending_bugreport_responses_.insert(
316       pending_bugreport_responses_.end(), std::move(resp));
317   auto weak_this = weak_ptr_factory_.GetWeakPtr();
318   auto callback = [weak_this, it](bool success, const std::string& msg) {
319     if (weak_this)
320       weak_this->OnSaveTraceForBugreportCallback(success, msg, std::move(it));
321   };
322   remote_consumer->service_endpoint->SaveTraceForBugreport(callback);
323 }
324 
325 // Called by the service in response to
326 // service_endpoint->SaveTraceForBugreport().
OnSaveTraceForBugreportCallback(bool success,const std::string & msg,PendingSaveTraceForBugreportResponses::iterator pending_response_it)327 void ConsumerIPCService::OnSaveTraceForBugreportCallback(
328     bool success,
329     const std::string& msg,
330     PendingSaveTraceForBugreportResponses::iterator pending_response_it) {
331   DeferredSaveTraceForBugreportResponse response(
332       std::move(*pending_response_it));
333   pending_bugreport_responses_.erase(pending_response_it);
334   auto resp =
335       ipc::AsyncResult<protos::gen::SaveTraceForBugreportResponse>::Create();
336   resp->set_success(success);
337   resp->set_msg(msg);
338   response.Resolve(std::move(resp));
339 }
340 
341 ////////////////////////////////////////////////////////////////////////////////
342 // RemoteConsumer methods
343 ////////////////////////////////////////////////////////////////////////////////
344 
345 ConsumerIPCService::RemoteConsumer::RemoteConsumer() = default;
346 ConsumerIPCService::RemoteConsumer::~RemoteConsumer() = default;
347 
348 // Invoked by the |core_service_| business logic after the ConnectConsumer()
349 // call. There is nothing to do here, we really expected the ConnectConsumer()
350 // to just work in the local case.
OnConnect()351 void ConsumerIPCService::RemoteConsumer::OnConnect() {}
352 
353 // Invoked by the |core_service_| business logic after we destroy the
354 // |service_endpoint| (in the RemoteConsumer dtor).
OnDisconnect()355 void ConsumerIPCService::RemoteConsumer::OnDisconnect() {}
356 
OnTracingDisabled(const std::string & error)357 void ConsumerIPCService::RemoteConsumer::OnTracingDisabled(
358     const std::string& error) {
359   if (enable_tracing_response.IsBound()) {
360     auto result =
361         ipc::AsyncResult<protos::gen::EnableTracingResponse>::Create();
362     result->set_disabled(true);
363     if (!error.empty())
364       result->set_error(error);
365     enable_tracing_response.Resolve(std::move(result));
366   }
367 }
368 
OnTraceData(std::vector<TracePacket> trace_packets,bool has_more)369 void ConsumerIPCService::RemoteConsumer::OnTraceData(
370     std::vector<TracePacket> trace_packets,
371     bool has_more) {
372   if (!read_buffers_response.IsBound())
373     return;
374 
375   auto result = ipc::AsyncResult<protos::gen::ReadBuffersResponse>::Create();
376 
377   // A TracePacket might be too big to fit into a single IPC message (max
378   // kIPCBufferSize). However a TracePacket is made of slices and each slice
379   // is way smaller than kIPCBufferSize (a slice size is effectively bounded by
380   // the max chunk size of the SharedMemoryABI). When sending a TracePacket,
381   // if its slices don't fit within one IPC, chunk them over several contiguous
382   // IPCs using the |last_slice_for_packet| for glueing on the other side.
383   static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2,
384                 "kIPCBufferSize too small given the max possible slice size");
385 
386   auto send_ipc_reply = [this, &result](bool more) {
387     result.set_has_more(more);
388     read_buffers_response.Resolve(std::move(result));
389     result = ipc::AsyncResult<protos::gen::ReadBuffersResponse>::Create();
390   };
391 
392   size_t approx_reply_size = 0;
393   for (const TracePacket& trace_packet : trace_packets) {
394     size_t num_slices_left_for_packet = trace_packet.slices().size();
395     for (const Slice& slice : trace_packet.slices()) {
396       // Check if this slice would cause the IPC to overflow its max size and,
397       // if that is the case, split the IPCs. The "16" and "64" below are
398       // over-estimations of, respectively:
399       // 16: the preamble that prefixes each slice (there are 2 x size fields
400       //     in the proto + the |last_slice_for_packet| bool).
401       // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame.
402       // If these estimations are wrong, BufferedFrameDeserializer::Serialize()
403       // will hit a DCHECK anyways.
404       const size_t approx_slice_size = slice.size + 16;
405       if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) {
406         // If we hit this CHECK we got a single slice that is > kIPCBufferSize.
407         PERFETTO_CHECK(result->slices_size() > 0);
408         send_ipc_reply(/*has_more=*/true);
409         approx_reply_size = 0;
410       }
411       approx_reply_size += approx_slice_size;
412 
413       auto* res_slice = result->add_slices();
414       res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0);
415       res_slice->set_data(slice.start, slice.size);
416     }
417   }
418   send_ipc_reply(has_more);
419 }
420 
OnDetach(bool success)421 void ConsumerIPCService::RemoteConsumer::OnDetach(bool success) {
422   if (!success) {
423     std::move(detach_response).Reject();
424     return;
425   }
426   auto resp = ipc::AsyncResult<protos::gen::DetachResponse>::Create();
427   std::move(detach_response).Resolve(std::move(resp));
428 }
429 
OnAttach(bool success,const TraceConfig & trace_config)430 void ConsumerIPCService::RemoteConsumer::OnAttach(
431     bool success,
432     const TraceConfig& trace_config) {
433   if (!success) {
434     std::move(attach_response).Reject();
435     return;
436   }
437   auto response = ipc::AsyncResult<protos::gen::AttachResponse>::Create();
438   *response->mutable_trace_config() = trace_config;
439   std::move(attach_response).Resolve(std::move(response));
440 }
441 
OnTraceStats(bool success,const TraceStats & stats)442 void ConsumerIPCService::RemoteConsumer::OnTraceStats(bool success,
443                                                       const TraceStats& stats) {
444   if (!success) {
445     std::move(get_trace_stats_response).Reject();
446     return;
447   }
448   auto response =
449       ipc::AsyncResult<protos::gen::GetTraceStatsResponse>::Create();
450   *response->mutable_trace_stats() = stats;
451   std::move(get_trace_stats_response).Resolve(std::move(response));
452 }
453 
OnObservableEvents(const ObservableEvents & events)454 void ConsumerIPCService::RemoteConsumer::OnObservableEvents(
455     const ObservableEvents& events) {
456   if (!observe_events_response.IsBound())
457     return;
458 
459   auto result = ipc::AsyncResult<protos::gen::ObserveEventsResponse>::Create();
460   result.set_has_more(true);
461   *result->mutable_events() = events;
462   observe_events_response.Resolve(std::move(result));
463 }
464 
CloseObserveEventsResponseStream()465 void ConsumerIPCService::RemoteConsumer::CloseObserveEventsResponseStream() {
466   if (!observe_events_response.IsBound())
467     return;
468 
469   auto result = ipc::AsyncResult<protos::gen::ObserveEventsResponse>::Create();
470   result.set_has_more(false);
471   observe_events_response.Resolve(std::move(result));
472 }
473 
474 }  // namespace perfetto
475