1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/ipc/service/consumer_ipc_service.h"
18 
19 #include <inttypes.h>
20 
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/scoped_file.h"
23 #include "perfetto/base/task_runner.h"
24 #include "perfetto/ipc/basic_types.h"
25 #include "perfetto/ipc/host.h"
26 #include "perfetto/tracing/core/shared_memory_abi.h"
27 #include "perfetto/tracing/core/slice.h"
28 #include "perfetto/tracing/core/trace_config.h"
29 #include "perfetto/tracing/core/trace_packet.h"
30 #include "perfetto/tracing/core/trace_stats.h"
31 #include "perfetto/tracing/core/tracing_service.h"
32 
33 namespace perfetto {
34 
ConsumerIPCService(TracingService * core_service)35 ConsumerIPCService::ConsumerIPCService(TracingService* core_service)
36     : core_service_(core_service), weak_ptr_factory_(this) {}
37 
38 ConsumerIPCService::~ConsumerIPCService() = default;
39 
40 ConsumerIPCService::RemoteConsumer*
GetConsumerForCurrentRequest()41 ConsumerIPCService::GetConsumerForCurrentRequest() {
42   const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
43   const uid_t uid = ipc::Service::client_info().uid();
44   PERFETTO_CHECK(ipc_client_id);
45   auto it = consumers_.find(ipc_client_id);
46   if (it == consumers_.end()) {
47     auto* remote_consumer = new RemoteConsumer();
48     consumers_[ipc_client_id].reset(remote_consumer);
49     remote_consumer->service_endpoint =
50         core_service_->ConnectConsumer(remote_consumer, uid);
51     return remote_consumer;
52   }
53   return it->second.get();
54 }
55 
56 // Called by the IPC layer.
OnClientDisconnected()57 void ConsumerIPCService::OnClientDisconnected() {
58   ipc::ClientID client_id = ipc::Service::client_info().client_id();
59   consumers_.erase(client_id);
60 }
61 
62 // Called by the IPC layer.
EnableTracing(const protos::EnableTracingRequest & req,DeferredEnableTracingResponse resp)63 void ConsumerIPCService::EnableTracing(const protos::EnableTracingRequest& req,
64                                        DeferredEnableTracingResponse resp) {
65   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
66   if (req.attach_notification_only()) {
67     remote_consumer->enable_tracing_response = std::move(resp);
68     return;
69   }
70   TraceConfig trace_config;
71   trace_config.FromProto(req.trace_config());
72   base::ScopedFile fd;
73   if (trace_config.write_into_file())
74     fd = ipc::Service::TakeReceivedFD();
75   remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd));
76   remote_consumer->enable_tracing_response = std::move(resp);
77 }
78 
79 // Called by the IPC layer.
StartTracing(const protos::StartTracingRequest &,DeferredStartTracingResponse resp)80 void ConsumerIPCService::StartTracing(const protos::StartTracingRequest&,
81                                       DeferredStartTracingResponse resp) {
82   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
83   remote_consumer->service_endpoint->StartTracing();
84   resp.Resolve(ipc::AsyncResult<protos::StartTracingResponse>::Create());
85 }
86 
87 // Called by the IPC layer.
ChangeTraceConfig(const protos::ChangeTraceConfigRequest & req,DeferredChangeTraceConfigResponse resp)88 void ConsumerIPCService::ChangeTraceConfig(
89     const protos::ChangeTraceConfigRequest& req,
90     DeferredChangeTraceConfigResponse resp) {
91   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
92   TraceConfig trace_config;
93   trace_config.FromProto(req.trace_config());
94   remote_consumer->service_endpoint->ChangeTraceConfig(trace_config);
95   resp.Resolve(ipc::AsyncResult<protos::ChangeTraceConfigResponse>::Create());
96 }
97 
98 // Called by the IPC layer.
DisableTracing(const protos::DisableTracingRequest &,DeferredDisableTracingResponse resp)99 void ConsumerIPCService::DisableTracing(const protos::DisableTracingRequest&,
100                                         DeferredDisableTracingResponse resp) {
101   GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
102   resp.Resolve(ipc::AsyncResult<protos::DisableTracingResponse>::Create());
103 }
104 
105 // Called by the IPC layer.
ReadBuffers(const protos::ReadBuffersRequest &,DeferredReadBuffersResponse resp)106 void ConsumerIPCService::ReadBuffers(const protos::ReadBuffersRequest&,
107                                      DeferredReadBuffersResponse resp) {
108   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
109   remote_consumer->read_buffers_response = std::move(resp);
110   remote_consumer->service_endpoint->ReadBuffers();
111 }
112 
113 // Called by the IPC layer.
FreeBuffers(const protos::FreeBuffersRequest &,DeferredFreeBuffersResponse resp)114 void ConsumerIPCService::FreeBuffers(const protos::FreeBuffersRequest&,
115                                      DeferredFreeBuffersResponse resp) {
116   GetConsumerForCurrentRequest()->service_endpoint->FreeBuffers();
117   resp.Resolve(ipc::AsyncResult<protos::FreeBuffersResponse>::Create());
118 }
119 
120 // Called by the IPC layer.
Flush(const protos::FlushRequest & req,DeferredFlushResponse resp)121 void ConsumerIPCService::Flush(const protos::FlushRequest& req,
122                                DeferredFlushResponse resp) {
123   auto it = pending_flush_responses_.insert(pending_flush_responses_.end(),
124                                             std::move(resp));
125   auto weak_this = weak_ptr_factory_.GetWeakPtr();
126   auto callback = [weak_this, it](bool success) {
127     if (weak_this)
128       weak_this->OnFlushCallback(success, std::move(it));
129   };
130   GetConsumerForCurrentRequest()->service_endpoint->Flush(req.timeout_ms(),
131                                                           std::move(callback));
132 }
133 
134 // Called by the IPC layer.
Detach(const protos::DetachRequest & req,DeferredDetachResponse resp)135 void ConsumerIPCService::Detach(const protos::DetachRequest& req,
136                                 DeferredDetachResponse resp) {
137   // OnDetach() will resolve the |detach_response|.
138   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
139   remote_consumer->detach_response = std::move(resp);
140   remote_consumer->service_endpoint->Detach(req.key());
141 }
142 
143 // Called by the IPC layer.
Attach(const protos::AttachRequest & req,DeferredAttachResponse resp)144 void ConsumerIPCService::Attach(const protos::AttachRequest& req,
145                                 DeferredAttachResponse resp) {
146   // OnAttach() will resolve the |attach_response|.
147   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
148   remote_consumer->attach_response = std::move(resp);
149   remote_consumer->service_endpoint->Attach(req.key());
150 }
151 
152 // Called by the IPC layer.
GetTraceStats(const protos::GetTraceStatsRequest &,DeferredGetTraceStatsResponse resp)153 void ConsumerIPCService::GetTraceStats(const protos::GetTraceStatsRequest&,
154                                        DeferredGetTraceStatsResponse resp) {
155   // OnTraceStats() will resolve the |get_trace_stats_response|.
156   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
157   remote_consumer->get_trace_stats_response = std::move(resp);
158   remote_consumer->service_endpoint->GetTraceStats();
159 }
160 
161 // Called by the IPC layer.
ObserveEvents(const protos::ObserveEventsRequest & req,DeferredObserveEventsResponse resp)162 void ConsumerIPCService::ObserveEvents(const protos::ObserveEventsRequest& req,
163                                        DeferredObserveEventsResponse resp) {
164   RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
165 
166   // If there's a prior stream, close it so that client can clean it up.
167   remote_consumer->CloseObserveEventsResponseStream();
168 
169   remote_consumer->observe_events_response = std::move(resp);
170 
171   bool observe_instances = false;
172   for (const auto& type : req.events_to_observe()) {
173     switch (type) {
174       case protos::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES:
175         observe_instances = true;
176         break;
177       default:
178         PERFETTO_DFATAL("Unknown ObservableEvent type: %d", type);
179         break;
180     }
181   }
182   remote_consumer->service_endpoint->ObserveEvents(observe_instances);
183 
184   // If no events are to be observed, close the stream immediately so that the
185   // client can clean up.
186   if (req.events_to_observe().size() == 0)
187     remote_consumer->CloseObserveEventsResponseStream();
188 }
189 
190 // Called by the service in response to a service_endpoint->Flush() request.
OnFlushCallback(bool success,PendingFlushResponses::iterator pending_response_it)191 void ConsumerIPCService::OnFlushCallback(
192     bool success,
193     PendingFlushResponses::iterator pending_response_it) {
194   DeferredFlushResponse response(std::move(*pending_response_it));
195   pending_flush_responses_.erase(pending_response_it);
196   if (success) {
197     response.Resolve(ipc::AsyncResult<protos::FlushResponse>::Create());
198   } else {
199     response.Reject();
200   }
201 }
202 
203 ////////////////////////////////////////////////////////////////////////////////
204 // RemoteConsumer methods
205 ////////////////////////////////////////////////////////////////////////////////
206 
207 ConsumerIPCService::RemoteConsumer::RemoteConsumer() = default;
208 ConsumerIPCService::RemoteConsumer::~RemoteConsumer() = default;
209 
210 // Invoked by the |core_service_| business logic after the ConnectConsumer()
211 // call. There is nothing to do here, we really expected the ConnectConsumer()
212 // to just work in the local case.
OnConnect()213 void ConsumerIPCService::RemoteConsumer::OnConnect() {}
214 
215 // Invoked by the |core_service_| business logic after we destroy the
216 // |service_endpoint| (in the RemoteConsumer dtor).
OnDisconnect()217 void ConsumerIPCService::RemoteConsumer::OnDisconnect() {}
218 
OnTracingDisabled()219 void ConsumerIPCService::RemoteConsumer::OnTracingDisabled() {
220   if (enable_tracing_response.IsBound()) {
221     auto result = ipc::AsyncResult<protos::EnableTracingResponse>::Create();
222     result->set_disabled(true);
223     enable_tracing_response.Resolve(std::move(result));
224   }
225 }
226 
OnTraceData(std::vector<TracePacket> trace_packets,bool has_more)227 void ConsumerIPCService::RemoteConsumer::OnTraceData(
228     std::vector<TracePacket> trace_packets,
229     bool has_more) {
230   if (!read_buffers_response.IsBound())
231     return;
232 
233   auto result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create();
234 
235   // A TracePacket might be too big to fit into a single IPC message (max
236   // kIPCBufferSize). However a TracePacket is made of slices and each slice
237   // is way smaller than kIPCBufferSize (a slice size is effectively bounded by
238   // the max chunk size of the SharedMemoryABI). When sending a TracePacket,
239   // if its slices don't fit within one IPC, chunk them over several contiguous
240   // IPCs using the |last_slice_for_packet| for glueing on the other side.
241   static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2,
242                 "kIPCBufferSize too small given the max possible slice size");
243 
244   auto send_ipc_reply = [this, &result](bool more) {
245     result.set_has_more(more);
246     read_buffers_response.Resolve(std::move(result));
247     result = ipc::AsyncResult<protos::ReadBuffersResponse>::Create();
248   };
249 
250   size_t approx_reply_size = 0;
251   for (const TracePacket& trace_packet : trace_packets) {
252     size_t num_slices_left_for_packet = trace_packet.slices().size();
253     for (const Slice& slice : trace_packet.slices()) {
254       // Check if this slice would cause the IPC to overflow its max size and,
255       // if that is the case, split the IPCs. The "16" and "64" below are
256       // over-estimations of, respectively:
257       // 16: the preamble that prefixes each slice (there are 2 x size fields
258       //     in the proto + the |last_slice_for_packet| bool).
259       // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame.
260       // If these estimations are wrong, BufferedFrameDeserializer::Serialize()
261       // will hit a DCHECK anyways.
262       const size_t approx_slice_size = slice.size + 16;
263       if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) {
264         // If we hit this CHECK we got a single slice that is > kIPCBufferSize.
265         PERFETTO_CHECK(result->slices_size() > 0);
266         send_ipc_reply(/*has_more=*/true);
267         approx_reply_size = 0;
268       }
269       approx_reply_size += approx_slice_size;
270 
271       auto* res_slice = result->add_slices();
272       res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0);
273       res_slice->set_data(slice.start, slice.size);
274     }
275   }
276   send_ipc_reply(has_more);
277 }
278 
OnDetach(bool success)279 void ConsumerIPCService::RemoteConsumer::OnDetach(bool success) {
280   if (!success) {
281     std::move(detach_response).Reject();
282     return;
283   }
284   auto resp = ipc::AsyncResult<protos::DetachResponse>::Create();
285   std::move(detach_response).Resolve(std::move(resp));
286 }
287 
OnAttach(bool success,const TraceConfig & trace_config)288 void ConsumerIPCService::RemoteConsumer::OnAttach(
289     bool success,
290     const TraceConfig& trace_config) {
291   if (!success) {
292     std::move(attach_response).Reject();
293     return;
294   }
295   auto response = ipc::AsyncResult<protos::AttachResponse>::Create();
296   trace_config.ToProto(response->mutable_trace_config());
297   std::move(attach_response).Resolve(std::move(response));
298 }
299 
OnTraceStats(bool success,const TraceStats & stats)300 void ConsumerIPCService::RemoteConsumer::OnTraceStats(bool success,
301                                                       const TraceStats& stats) {
302   if (!success) {
303     std::move(get_trace_stats_response).Reject();
304     return;
305   }
306   auto response = ipc::AsyncResult<protos::GetTraceStatsResponse>::Create();
307   stats.ToProto(response->mutable_trace_stats());
308   std::move(get_trace_stats_response).Resolve(std::move(response));
309 }
310 
OnObservableEvents(const ObservableEvents & events)311 void ConsumerIPCService::RemoteConsumer::OnObservableEvents(
312     const ObservableEvents& events) {
313   if (!observe_events_response.IsBound())
314     return;
315 
316   auto result = ipc::AsyncResult<protos::ObserveEventsResponse>::Create();
317   result.set_has_more(true);
318   events.ToProto(result->mutable_events());
319   observe_events_response.Resolve(std::move(result));
320 }
321 
CloseObserveEventsResponseStream()322 void ConsumerIPCService::RemoteConsumer::CloseObserveEventsResponseStream() {
323   if (!observe_events_response.IsBound())
324     return;
325 
326   auto result = ipc::AsyncResult<protos::ObserveEventsResponse>::Create();
327   result.set_has_more(false);
328   observe_events_response.Resolve(std::move(result));
329 }
330 
331 }  // namespace perfetto
332