1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/ipc/service/consumer_ipc_service.h"
18
19 #include <inttypes.h>
20
21 #include "perfetto/base/logging.h"
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ext/base/scoped_file.h"
24 #include "perfetto/ext/ipc/basic_types.h"
25 #include "perfetto/ext/ipc/host.h"
26 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
27 #include "perfetto/ext/tracing/core/slice.h"
28 #include "perfetto/ext/tracing/core/trace_packet.h"
29 #include "perfetto/ext/tracing/core/trace_stats.h"
30 #include "perfetto/ext/tracing/core/tracing_service.h"
31 #include "perfetto/tracing/core/trace_config.h"
32 #include "perfetto/tracing/core/tracing_service_capabilities.h"
33 #include "perfetto/tracing/core/tracing_service_state.h"
34
35 namespace perfetto {
36
ConsumerIPCService(TracingService * core_service)37 ConsumerIPCService::ConsumerIPCService(TracingService* core_service)
38 : core_service_(core_service), weak_ptr_factory_(this) {}
39
40 ConsumerIPCService::~ConsumerIPCService() = default;
41
42 ConsumerIPCService::RemoteConsumer*
GetConsumerForCurrentRequest()43 ConsumerIPCService::GetConsumerForCurrentRequest() {
44 const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
45 const uid_t uid = ipc::Service::client_info().uid();
46 PERFETTO_CHECK(ipc_client_id);
47 auto it = consumers_.find(ipc_client_id);
48 if (it == consumers_.end()) {
49 auto* remote_consumer = new RemoteConsumer();
50 consumers_[ipc_client_id].reset(remote_consumer);
51 remote_consumer->service_endpoint =
52 core_service_->ConnectConsumer(remote_consumer, uid);
53 return remote_consumer;
54 }
55 return it->second.get();
56 }
57
58 // Called by the IPC layer.
OnClientDisconnected()59 void ConsumerIPCService::OnClientDisconnected() {
60 ipc::ClientID client_id = ipc::Service::client_info().client_id();
61 consumers_.erase(client_id);
62 }
63
64 // Called by the IPC layer.
EnableTracing(const protos::gen::EnableTracingRequest & req,DeferredEnableTracingResponse resp)65 void ConsumerIPCService::EnableTracing(
66 const protos::gen::EnableTracingRequest& req,
67 DeferredEnableTracingResponse resp) {
68 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
69 if (req.attach_notification_only()) {
70 remote_consumer->enable_tracing_response = std::move(resp);
71 return;
72 }
73 const TraceConfig& trace_config = req.trace_config();
74 base::ScopedFile fd;
75 if (trace_config.write_into_file() && trace_config.output_path().empty())
76 fd = ipc::Service::TakeReceivedFD();
77 remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd));
78 remote_consumer->enable_tracing_response = std::move(resp);
79 }
80
81 // Called by the IPC layer.
StartTracing(const protos::gen::StartTracingRequest &,DeferredStartTracingResponse resp)82 void ConsumerIPCService::StartTracing(const protos::gen::StartTracingRequest&,
83 DeferredStartTracingResponse resp) {
84 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
85 remote_consumer->service_endpoint->StartTracing();
86 resp.Resolve(ipc::AsyncResult<protos::gen::StartTracingResponse>::Create());
87 }
88
89 // Called by the IPC layer.
ChangeTraceConfig(const protos::gen::ChangeTraceConfigRequest & req,DeferredChangeTraceConfigResponse resp)90 void ConsumerIPCService::ChangeTraceConfig(
91 const protos::gen::ChangeTraceConfigRequest& req,
92 DeferredChangeTraceConfigResponse resp) {
93 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
94 remote_consumer->service_endpoint->ChangeTraceConfig(req.trace_config());
95 resp.Resolve(
96 ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse>::Create());
97 }
98
99 // Called by the IPC layer.
DisableTracing(const protos::gen::DisableTracingRequest &,DeferredDisableTracingResponse resp)100 void ConsumerIPCService::DisableTracing(
101 const protos::gen::DisableTracingRequest&,
102 DeferredDisableTracingResponse resp) {
103 GetConsumerForCurrentRequest()->service_endpoint->DisableTracing();
104 resp.Resolve(ipc::AsyncResult<protos::gen::DisableTracingResponse>::Create());
105 }
106
107 // Called by the IPC layer.
ReadBuffers(const protos::gen::ReadBuffersRequest &,DeferredReadBuffersResponse resp)108 void ConsumerIPCService::ReadBuffers(const protos::gen::ReadBuffersRequest&,
109 DeferredReadBuffersResponse resp) {
110 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
111 remote_consumer->read_buffers_response = std::move(resp);
112 remote_consumer->service_endpoint->ReadBuffers();
113 }
114
115 // Called by the IPC layer.
FreeBuffers(const protos::gen::FreeBuffersRequest &,DeferredFreeBuffersResponse resp)116 void ConsumerIPCService::FreeBuffers(const protos::gen::FreeBuffersRequest&,
117 DeferredFreeBuffersResponse resp) {
118 GetConsumerForCurrentRequest()->service_endpoint->FreeBuffers();
119 resp.Resolve(ipc::AsyncResult<protos::gen::FreeBuffersResponse>::Create());
120 }
121
122 // Called by the IPC layer.
Flush(const protos::gen::FlushRequest & req,DeferredFlushResponse resp)123 void ConsumerIPCService::Flush(const protos::gen::FlushRequest& req,
124 DeferredFlushResponse resp) {
125 auto it = pending_flush_responses_.insert(pending_flush_responses_.end(),
126 std::move(resp));
127 auto weak_this = weak_ptr_factory_.GetWeakPtr();
128 auto callback = [weak_this, it](bool success) {
129 if (weak_this)
130 weak_this->OnFlushCallback(success, std::move(it));
131 };
132 GetConsumerForCurrentRequest()->service_endpoint->Flush(req.timeout_ms(),
133 std::move(callback));
134 }
135
136 // Called by the IPC layer.
Detach(const protos::gen::DetachRequest & req,DeferredDetachResponse resp)137 void ConsumerIPCService::Detach(const protos::gen::DetachRequest& req,
138 DeferredDetachResponse resp) {
139 // OnDetach() will resolve the |detach_response|.
140 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
141 remote_consumer->detach_response = std::move(resp);
142 remote_consumer->service_endpoint->Detach(req.key());
143 }
144
145 // Called by the IPC layer.
Attach(const protos::gen::AttachRequest & req,DeferredAttachResponse resp)146 void ConsumerIPCService::Attach(const protos::gen::AttachRequest& req,
147 DeferredAttachResponse resp) {
148 // OnAttach() will resolve the |attach_response|.
149 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
150 remote_consumer->attach_response = std::move(resp);
151 remote_consumer->service_endpoint->Attach(req.key());
152 }
153
154 // Called by the IPC layer.
GetTraceStats(const protos::gen::GetTraceStatsRequest &,DeferredGetTraceStatsResponse resp)155 void ConsumerIPCService::GetTraceStats(const protos::gen::GetTraceStatsRequest&,
156 DeferredGetTraceStatsResponse resp) {
157 // OnTraceStats() will resolve the |get_trace_stats_response|.
158 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
159 remote_consumer->get_trace_stats_response = std::move(resp);
160 remote_consumer->service_endpoint->GetTraceStats();
161 }
162
163 // Called by the IPC layer.
ObserveEvents(const protos::gen::ObserveEventsRequest & req,DeferredObserveEventsResponse resp)164 void ConsumerIPCService::ObserveEvents(
165 const protos::gen::ObserveEventsRequest& req,
166 DeferredObserveEventsResponse resp) {
167 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
168
169 // If there's a prior stream, close it so that client can clean it up.
170 remote_consumer->CloseObserveEventsResponseStream();
171
172 remote_consumer->observe_events_response = std::move(resp);
173
174 uint32_t events_mask = 0;
175 for (const auto& type : req.events_to_observe()) {
176 events_mask |= static_cast<uint32_t>(type);
177 }
178 remote_consumer->service_endpoint->ObserveEvents(events_mask);
179
180 // If no events are to be observed, close the stream immediately so that the
181 // client can clean up.
182 if (events_mask == 0)
183 remote_consumer->CloseObserveEventsResponseStream();
184 }
185
186 // Called by the IPC layer.
QueryServiceState(const protos::gen::QueryServiceStateRequest &,DeferredQueryServiceStateResponse resp)187 void ConsumerIPCService::QueryServiceState(
188 const protos::gen::QueryServiceStateRequest&,
189 DeferredQueryServiceStateResponse resp) {
190 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
191 auto it = pending_query_service_responses_.insert(
192 pending_query_service_responses_.end(), std::move(resp));
193 auto weak_this = weak_ptr_factory_.GetWeakPtr();
194 auto callback = [weak_this, it](bool success,
195 const TracingServiceState& svc_state) {
196 if (weak_this)
197 weak_this->OnQueryServiceCallback(success, svc_state, std::move(it));
198 };
199 remote_consumer->service_endpoint->QueryServiceState(callback);
200 }
201
202 // Called by the service in response to service_endpoint->QueryServiceState().
OnQueryServiceCallback(bool success,const TracingServiceState & svc_state,PendingQuerySvcResponses::iterator pending_response_it)203 void ConsumerIPCService::OnQueryServiceCallback(
204 bool success,
205 const TracingServiceState& svc_state,
206 PendingQuerySvcResponses::iterator pending_response_it) {
207 DeferredQueryServiceStateResponse response(std::move(*pending_response_it));
208 pending_query_service_responses_.erase(pending_response_it);
209 if (!success) {
210 response.Reject();
211 return;
212 }
213
214 // The TracingServiceState object might be too big to fit into a single IPC
215 // message because it contains the DataSourceDescriptor of each data source.
216 // Here we split it in chunks to fit in the IPC limit, observing the
217 // following rule: each chunk must be invididually a valid TracingServiceState
218 // message; all the chunks concatenated together must form the original
219 // message. This is to deal with the legacy API that was just sending one
220 // whole message (failing in presence of too many data sources, b/153142114).
221 // The message is split as follows: we take the whole TracingServiceState,
222 // take out the data sources section (which is a top-level repeated field)
223 // and re-add them one-by-one. If, in the process of appending, the IPC msg
224 // size is reached, a new chunk is created. This assumes that the rest of
225 // TracingServiceState fits in one IPC message and each DataSourceDescriptor
226 // fits in the worst case in a dedicated message (which is true, because
227 // otherwise the RegisterDataSource() which passes the descriptor in the first
228 // place would fail).
229
230 std::vector<uint8_t> chunked_reply;
231
232 // Transmits the current chunk and starts a new one.
233 bool sent_eof = false;
234 auto send_chunked_reply = [&chunked_reply, &response,
235 &sent_eof](bool has_more) {
236 PERFETTO_CHECK(!sent_eof);
237 sent_eof = !has_more;
238 auto resp =
239 ipc::AsyncResult<protos::gen::QueryServiceStateResponse>::Create();
240 resp.set_has_more(has_more);
241 PERFETTO_CHECK(resp->mutable_service_state()->ParseFromArray(
242 chunked_reply.data(), chunked_reply.size()));
243 chunked_reply.clear();
244 response.Resolve(std::move(resp));
245 };
246
247 // Create a copy of the whole response and cut away the data_sources section.
248 protos::gen::TracingServiceState svc_state_copy = svc_state;
249 auto data_sources = std::move(*svc_state_copy.mutable_data_sources());
250 chunked_reply = svc_state_copy.SerializeAsArray();
251
252 // Now re-add them fitting within the IPC message limits (- some margin for
253 // the outer IPC frame).
254 constexpr size_t kMaxMsgSize = ipc::kIPCBufferSize - 128;
255 for (const auto& data_source : data_sources) {
256 protos::gen::TracingServiceState tmp;
257 tmp.mutable_data_sources()->emplace_back(std::move(data_source));
258 std::vector<uint8_t> chunk = tmp.SerializeAsArray();
259 if (chunked_reply.size() + chunk.size() < kMaxMsgSize) {
260 chunked_reply.insert(chunked_reply.end(), chunk.begin(), chunk.end());
261 } else {
262 send_chunked_reply(/*has_more=*/true);
263 chunked_reply = std::move(chunk);
264 }
265 }
266
267 PERFETTO_DCHECK(!chunked_reply.empty());
268 send_chunked_reply(/*has_more=*/false);
269 PERFETTO_CHECK(sent_eof);
270 }
271
272 // Called by the service in response to a service_endpoint->Flush() request.
OnFlushCallback(bool success,PendingFlushResponses::iterator pending_response_it)273 void ConsumerIPCService::OnFlushCallback(
274 bool success,
275 PendingFlushResponses::iterator pending_response_it) {
276 DeferredFlushResponse response(std::move(*pending_response_it));
277 pending_flush_responses_.erase(pending_response_it);
278 if (success) {
279 response.Resolve(ipc::AsyncResult<protos::gen::FlushResponse>::Create());
280 } else {
281 response.Reject();
282 }
283 }
284
QueryCapabilities(const protos::gen::QueryCapabilitiesRequest &,DeferredQueryCapabilitiesResponse resp)285 void ConsumerIPCService::QueryCapabilities(
286 const protos::gen::QueryCapabilitiesRequest&,
287 DeferredQueryCapabilitiesResponse resp) {
288 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
289 auto it = pending_query_capabilities_responses_.insert(
290 pending_query_capabilities_responses_.end(), std::move(resp));
291 auto weak_this = weak_ptr_factory_.GetWeakPtr();
292 auto callback = [weak_this, it](const TracingServiceCapabilities& caps) {
293 if (weak_this)
294 weak_this->OnQueryCapabilitiesCallback(caps, std::move(it));
295 };
296 remote_consumer->service_endpoint->QueryCapabilities(callback);
297 }
298
299 // Called by the service in response to service_endpoint->QueryCapabilities().
OnQueryCapabilitiesCallback(const TracingServiceCapabilities & caps,PendingQueryCapabilitiesResponses::iterator pending_response_it)300 void ConsumerIPCService::OnQueryCapabilitiesCallback(
301 const TracingServiceCapabilities& caps,
302 PendingQueryCapabilitiesResponses::iterator pending_response_it) {
303 DeferredQueryCapabilitiesResponse response(std::move(*pending_response_it));
304 pending_query_capabilities_responses_.erase(pending_response_it);
305 auto resp =
306 ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse>::Create();
307 *resp->mutable_capabilities() = caps;
308 response.Resolve(std::move(resp));
309 }
310
SaveTraceForBugreport(const protos::gen::SaveTraceForBugreportRequest &,DeferredSaveTraceForBugreportResponse resp)311 void ConsumerIPCService::SaveTraceForBugreport(
312 const protos::gen::SaveTraceForBugreportRequest&,
313 DeferredSaveTraceForBugreportResponse resp) {
314 RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
315 auto it = pending_bugreport_responses_.insert(
316 pending_bugreport_responses_.end(), std::move(resp));
317 auto weak_this = weak_ptr_factory_.GetWeakPtr();
318 auto callback = [weak_this, it](bool success, const std::string& msg) {
319 if (weak_this)
320 weak_this->OnSaveTraceForBugreportCallback(success, msg, std::move(it));
321 };
322 remote_consumer->service_endpoint->SaveTraceForBugreport(callback);
323 }
324
325 // Called by the service in response to
326 // service_endpoint->SaveTraceForBugreport().
OnSaveTraceForBugreportCallback(bool success,const std::string & msg,PendingSaveTraceForBugreportResponses::iterator pending_response_it)327 void ConsumerIPCService::OnSaveTraceForBugreportCallback(
328 bool success,
329 const std::string& msg,
330 PendingSaveTraceForBugreportResponses::iterator pending_response_it) {
331 DeferredSaveTraceForBugreportResponse response(
332 std::move(*pending_response_it));
333 pending_bugreport_responses_.erase(pending_response_it);
334 auto resp =
335 ipc::AsyncResult<protos::gen::SaveTraceForBugreportResponse>::Create();
336 resp->set_success(success);
337 resp->set_msg(msg);
338 response.Resolve(std::move(resp));
339 }
340
341 ////////////////////////////////////////////////////////////////////////////////
342 // RemoteConsumer methods
343 ////////////////////////////////////////////////////////////////////////////////
344
345 ConsumerIPCService::RemoteConsumer::RemoteConsumer() = default;
346 ConsumerIPCService::RemoteConsumer::~RemoteConsumer() = default;
347
348 // Invoked by the |core_service_| business logic after the ConnectConsumer()
349 // call. There is nothing to do here, we really expected the ConnectConsumer()
350 // to just work in the local case.
OnConnect()351 void ConsumerIPCService::RemoteConsumer::OnConnect() {}
352
353 // Invoked by the |core_service_| business logic after we destroy the
354 // |service_endpoint| (in the RemoteConsumer dtor).
OnDisconnect()355 void ConsumerIPCService::RemoteConsumer::OnDisconnect() {}
356
OnTracingDisabled(const std::string & error)357 void ConsumerIPCService::RemoteConsumer::OnTracingDisabled(
358 const std::string& error) {
359 if (enable_tracing_response.IsBound()) {
360 auto result =
361 ipc::AsyncResult<protos::gen::EnableTracingResponse>::Create();
362 result->set_disabled(true);
363 if (!error.empty())
364 result->set_error(error);
365 enable_tracing_response.Resolve(std::move(result));
366 }
367 }
368
OnTraceData(std::vector<TracePacket> trace_packets,bool has_more)369 void ConsumerIPCService::RemoteConsumer::OnTraceData(
370 std::vector<TracePacket> trace_packets,
371 bool has_more) {
372 if (!read_buffers_response.IsBound())
373 return;
374
375 auto result = ipc::AsyncResult<protos::gen::ReadBuffersResponse>::Create();
376
377 // A TracePacket might be too big to fit into a single IPC message (max
378 // kIPCBufferSize). However a TracePacket is made of slices and each slice
379 // is way smaller than kIPCBufferSize (a slice size is effectively bounded by
380 // the max chunk size of the SharedMemoryABI). When sending a TracePacket,
381 // if its slices don't fit within one IPC, chunk them over several contiguous
382 // IPCs using the |last_slice_for_packet| for glueing on the other side.
383 static_assert(ipc::kIPCBufferSize >= SharedMemoryABI::kMaxPageSize * 2,
384 "kIPCBufferSize too small given the max possible slice size");
385
386 auto send_ipc_reply = [this, &result](bool more) {
387 result.set_has_more(more);
388 read_buffers_response.Resolve(std::move(result));
389 result = ipc::AsyncResult<protos::gen::ReadBuffersResponse>::Create();
390 };
391
392 size_t approx_reply_size = 0;
393 for (const TracePacket& trace_packet : trace_packets) {
394 size_t num_slices_left_for_packet = trace_packet.slices().size();
395 for (const Slice& slice : trace_packet.slices()) {
396 // Check if this slice would cause the IPC to overflow its max size and,
397 // if that is the case, split the IPCs. The "16" and "64" below are
398 // over-estimations of, respectively:
399 // 16: the preamble that prefixes each slice (there are 2 x size fields
400 // in the proto + the |last_slice_for_packet| bool).
401 // 64: the overhead of the IPC InvokeMethodReply + wire_protocol's frame.
402 // If these estimations are wrong, BufferedFrameDeserializer::Serialize()
403 // will hit a DCHECK anyways.
404 const size_t approx_slice_size = slice.size + 16;
405 if (approx_reply_size + approx_slice_size > ipc::kIPCBufferSize - 64) {
406 // If we hit this CHECK we got a single slice that is > kIPCBufferSize.
407 PERFETTO_CHECK(result->slices_size() > 0);
408 send_ipc_reply(/*has_more=*/true);
409 approx_reply_size = 0;
410 }
411 approx_reply_size += approx_slice_size;
412
413 auto* res_slice = result->add_slices();
414 res_slice->set_last_slice_for_packet(--num_slices_left_for_packet == 0);
415 res_slice->set_data(slice.start, slice.size);
416 }
417 }
418 send_ipc_reply(has_more);
419 }
420
OnDetach(bool success)421 void ConsumerIPCService::RemoteConsumer::OnDetach(bool success) {
422 if (!success) {
423 std::move(detach_response).Reject();
424 return;
425 }
426 auto resp = ipc::AsyncResult<protos::gen::DetachResponse>::Create();
427 std::move(detach_response).Resolve(std::move(resp));
428 }
429
OnAttach(bool success,const TraceConfig & trace_config)430 void ConsumerIPCService::RemoteConsumer::OnAttach(
431 bool success,
432 const TraceConfig& trace_config) {
433 if (!success) {
434 std::move(attach_response).Reject();
435 return;
436 }
437 auto response = ipc::AsyncResult<protos::gen::AttachResponse>::Create();
438 *response->mutable_trace_config() = trace_config;
439 std::move(attach_response).Resolve(std::move(response));
440 }
441
OnTraceStats(bool success,const TraceStats & stats)442 void ConsumerIPCService::RemoteConsumer::OnTraceStats(bool success,
443 const TraceStats& stats) {
444 if (!success) {
445 std::move(get_trace_stats_response).Reject();
446 return;
447 }
448 auto response =
449 ipc::AsyncResult<protos::gen::GetTraceStatsResponse>::Create();
450 *response->mutable_trace_stats() = stats;
451 std::move(get_trace_stats_response).Resolve(std::move(response));
452 }
453
OnObservableEvents(const ObservableEvents & events)454 void ConsumerIPCService::RemoteConsumer::OnObservableEvents(
455 const ObservableEvents& events) {
456 if (!observe_events_response.IsBound())
457 return;
458
459 auto result = ipc::AsyncResult<protos::gen::ObserveEventsResponse>::Create();
460 result.set_has_more(true);
461 *result->mutable_events() = events;
462 observe_events_response.Resolve(std::move(result));
463 }
464
CloseObserveEventsResponseStream()465 void ConsumerIPCService::RemoteConsumer::CloseObserveEventsResponseStream() {
466 if (!observe_events_response.IsBound())
467 return;
468
469 auto result = ipc::AsyncResult<protos::gen::ObserveEventsResponse>::Create();
470 result.set_has_more(false);
471 observe_events_response.Resolve(std::move(result));
472 }
473
474 } // namespace perfetto
475