1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/ipc/consumer/consumer_ipc_client_impl.h"
18
19 #include <inttypes.h>
20 #include <string.h>
21
22 #include "perfetto/base/task_runner.h"
23 #include "perfetto/ext/ipc/client.h"
24 #include "perfetto/ext/tracing/core/consumer.h"
25 #include "perfetto/ext/tracing/core/observable_events.h"
26 #include "perfetto/ext/tracing/core/trace_stats.h"
27 #include "perfetto/tracing/core/trace_config.h"
28 #include "perfetto/tracing/core/tracing_service_state.h"
29
30 // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl
31 // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the
32 // Consumer* during the callbacks.
33
34 namespace perfetto {
35
36 // static. (Declared in include/tracing/ipc/consumer_ipc_client.h).
Connect(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)37 std::unique_ptr<TracingService::ConsumerEndpoint> ConsumerIPCClient::Connect(
38 const char* service_sock_name,
39 Consumer* consumer,
40 base::TaskRunner* task_runner) {
41 return std::unique_ptr<TracingService::ConsumerEndpoint>(
42 new ConsumerIPCClientImpl(service_sock_name, consumer, task_runner));
43 }
44
ConsumerIPCClientImpl(const char * service_sock_name,Consumer * consumer,base::TaskRunner * task_runner)45 ConsumerIPCClientImpl::ConsumerIPCClientImpl(const char* service_sock_name,
46 Consumer* consumer,
47 base::TaskRunner* task_runner)
48 : consumer_(consumer),
49 ipc_channel_(
50 ipc::Client::CreateInstance({service_sock_name, /*sock_retry=*/false},
51 task_runner)),
52 consumer_port_(this /* event_listener */),
53 weak_ptr_factory_(this) {
54 ipc_channel_->BindService(consumer_port_.GetWeakPtr());
55 }
56
57 ConsumerIPCClientImpl::~ConsumerIPCClientImpl() = default;
58
59 // Called by the IPC layer if the BindService() succeeds.
OnConnect()60 void ConsumerIPCClientImpl::OnConnect() {
61 connected_ = true;
62 consumer_->OnConnect();
63 }
64
OnDisconnect()65 void ConsumerIPCClientImpl::OnDisconnect() {
66 PERFETTO_DLOG("Tracing service connection failure");
67 connected_ = false;
68 consumer_->OnDisconnect(); // Note: may delete |this|.
69 }
70
EnableTracing(const TraceConfig & trace_config,base::ScopedFile fd)71 void ConsumerIPCClientImpl::EnableTracing(const TraceConfig& trace_config,
72 base::ScopedFile fd) {
73 if (!connected_) {
74 PERFETTO_DLOG("Cannot EnableTracing(), not connected to tracing service");
75 return;
76 }
77
78 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
79 if (fd) {
80 consumer_->OnTracingDisabled(
81 "Passing FDs for write_into_file is not supported on Windows");
82 return;
83 }
84 #endif
85
86 protos::gen::EnableTracingRequest req;
87 *req.mutable_trace_config() = trace_config;
88 ipc::Deferred<protos::gen::EnableTracingResponse> async_response;
89 auto weak_this = weak_ptr_factory_.GetWeakPtr();
90 async_response.Bind(
91 [weak_this](
92 ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
93 if (weak_this)
94 weak_this->OnEnableTracingResponse(std::move(response));
95 });
96
97 // |fd| will be closed when this function returns, but it's fine because the
98 // IPC layer dup()'s it when sending the IPC.
99 consumer_port_.EnableTracing(req, std::move(async_response), *fd);
100 }
101
ChangeTraceConfig(const TraceConfig & trace_config)102 void ConsumerIPCClientImpl::ChangeTraceConfig(const TraceConfig& trace_config) {
103 if (!connected_) {
104 PERFETTO_DLOG(
105 "Cannot ChangeTraceConfig(), not connected to tracing service");
106 return;
107 }
108
109 ipc::Deferred<protos::gen::ChangeTraceConfigResponse> async_response;
110 async_response.Bind(
111 [](ipc::AsyncResult<protos::gen::ChangeTraceConfigResponse> response) {
112 if (!response)
113 PERFETTO_DLOG("ChangeTraceConfig() failed");
114 });
115 protos::gen::ChangeTraceConfigRequest req;
116 *req.mutable_trace_config() = trace_config;
117 consumer_port_.ChangeTraceConfig(req, std::move(async_response));
118 }
119
StartTracing()120 void ConsumerIPCClientImpl::StartTracing() {
121 if (!connected_) {
122 PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service");
123 return;
124 }
125
126 ipc::Deferred<protos::gen::StartTracingResponse> async_response;
127 async_response.Bind(
128 [](ipc::AsyncResult<protos::gen::StartTracingResponse> response) {
129 if (!response)
130 PERFETTO_DLOG("StartTracing() failed");
131 });
132 protos::gen::StartTracingRequest req;
133 consumer_port_.StartTracing(req, std::move(async_response));
134 }
135
DisableTracing()136 void ConsumerIPCClientImpl::DisableTracing() {
137 if (!connected_) {
138 PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service");
139 return;
140 }
141
142 ipc::Deferred<protos::gen::DisableTracingResponse> async_response;
143 async_response.Bind(
144 [](ipc::AsyncResult<protos::gen::DisableTracingResponse> response) {
145 if (!response)
146 PERFETTO_DLOG("DisableTracing() failed");
147 });
148 consumer_port_.DisableTracing(protos::gen::DisableTracingRequest(),
149 std::move(async_response));
150 }
151
ReadBuffers()152 void ConsumerIPCClientImpl::ReadBuffers() {
153 if (!connected_) {
154 PERFETTO_DLOG("Cannot ReadBuffers(), not connected to tracing service");
155 return;
156 }
157
158 ipc::Deferred<protos::gen::ReadBuffersResponse> async_response;
159
160 // The IPC layer guarantees that callbacks are destroyed after this object
161 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
162 // contract of this class expects the caller to not destroy the Consumer class
163 // before having destroyed this class. Hence binding |this| here is safe.
164 async_response.Bind(
165 [this](ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
166 OnReadBuffersResponse(std::move(response));
167 });
168 consumer_port_.ReadBuffers(protos::gen::ReadBuffersRequest(),
169 std::move(async_response));
170 }
171
OnReadBuffersResponse(ipc::AsyncResult<protos::gen::ReadBuffersResponse> response)172 void ConsumerIPCClientImpl::OnReadBuffersResponse(
173 ipc::AsyncResult<protos::gen::ReadBuffersResponse> response) {
174 if (!response) {
175 PERFETTO_DLOG("ReadBuffers() failed");
176 return;
177 }
178 std::vector<TracePacket> trace_packets;
179 for (auto& resp_slice : response->slices()) {
180 const std::string& slice_data = resp_slice.data();
181 Slice slice = Slice::Allocate(slice_data.size());
182 memcpy(slice.own_data(), slice_data.data(), slice.size);
183 partial_packet_.AddSlice(std::move(slice));
184 if (resp_slice.last_slice_for_packet())
185 trace_packets.emplace_back(std::move(partial_packet_));
186 }
187 if (!trace_packets.empty() || !response.has_more())
188 consumer_->OnTraceData(std::move(trace_packets), response.has_more());
189 }
190
OnEnableTracingResponse(ipc::AsyncResult<protos::gen::EnableTracingResponse> response)191 void ConsumerIPCClientImpl::OnEnableTracingResponse(
192 ipc::AsyncResult<protos::gen::EnableTracingResponse> response) {
193 std::string error;
194 // |response| might be empty when the request gets rejected (if the connection
195 // with the service is dropped all outstanding requests are auto-rejected).
196 if (!response) {
197 error =
198 "EnableTracing IPC request rejected. This is likely due to a loss of "
199 "the traced connection";
200 } else {
201 error = response->error();
202 }
203 if (!response || response->disabled())
204 consumer_->OnTracingDisabled(error);
205 }
206
FreeBuffers()207 void ConsumerIPCClientImpl::FreeBuffers() {
208 if (!connected_) {
209 PERFETTO_DLOG("Cannot FreeBuffers(), not connected to tracing service");
210 return;
211 }
212
213 protos::gen::FreeBuffersRequest req;
214 ipc::Deferred<protos::gen::FreeBuffersResponse> async_response;
215 async_response.Bind(
216 [](ipc::AsyncResult<protos::gen::FreeBuffersResponse> response) {
217 if (!response)
218 PERFETTO_DLOG("FreeBuffers() failed");
219 });
220 consumer_port_.FreeBuffers(req, std::move(async_response));
221 }
222
Flush(uint32_t timeout_ms,FlushCallback callback)223 void ConsumerIPCClientImpl::Flush(uint32_t timeout_ms, FlushCallback callback) {
224 if (!connected_) {
225 PERFETTO_DLOG("Cannot Flush(), not connected to tracing service");
226 return callback(/*success=*/false);
227 }
228
229 protos::gen::FlushRequest req;
230 req.set_timeout_ms(static_cast<uint32_t>(timeout_ms));
231 ipc::Deferred<protos::gen::FlushResponse> async_response;
232 async_response.Bind(
233 [callback](ipc::AsyncResult<protos::gen::FlushResponse> response) {
234 callback(!!response);
235 });
236 consumer_port_.Flush(req, std::move(async_response));
237 }
238
Detach(const std::string & key)239 void ConsumerIPCClientImpl::Detach(const std::string& key) {
240 if (!connected_) {
241 PERFETTO_DLOG("Cannot Detach(), not connected to tracing service");
242 return;
243 }
244
245 protos::gen::DetachRequest req;
246 req.set_key(key);
247 ipc::Deferred<protos::gen::DetachResponse> async_response;
248 auto weak_this = weak_ptr_factory_.GetWeakPtr();
249
250 async_response.Bind(
251 [weak_this](ipc::AsyncResult<protos::gen::DetachResponse> response) {
252 if (weak_this)
253 weak_this->consumer_->OnDetach(!!response);
254 });
255 consumer_port_.Detach(req, std::move(async_response));
256 }
257
Attach(const std::string & key)258 void ConsumerIPCClientImpl::Attach(const std::string& key) {
259 if (!connected_) {
260 PERFETTO_DLOG("Cannot Attach(), not connected to tracing service");
261 return;
262 }
263
264 {
265 protos::gen::AttachRequest req;
266 req.set_key(key);
267 ipc::Deferred<protos::gen::AttachResponse> async_response;
268 auto weak_this = weak_ptr_factory_.GetWeakPtr();
269
270 async_response.Bind(
271 [weak_this](ipc::AsyncResult<protos::gen::AttachResponse> response) {
272 if (!weak_this)
273 return;
274 if (!response) {
275 weak_this->consumer_->OnAttach(/*success=*/false, TraceConfig());
276 return;
277 }
278 const TraceConfig& trace_config = response->trace_config();
279
280 // If attached succesfully, also attach to the end-of-trace
281 // notificaton callback, via EnableTracing(attach_notification_only).
282 protos::gen::EnableTracingRequest enable_req;
283 enable_req.set_attach_notification_only(true);
284 ipc::Deferred<protos::gen::EnableTracingResponse> enable_resp;
285 enable_resp.Bind(
286 [weak_this](
287 ipc::AsyncResult<protos::gen::EnableTracingResponse> resp) {
288 if (weak_this)
289 weak_this->OnEnableTracingResponse(std::move(resp));
290 });
291 weak_this->consumer_port_.EnableTracing(enable_req,
292 std::move(enable_resp));
293
294 weak_this->consumer_->OnAttach(/*success=*/true, trace_config);
295 });
296 consumer_port_.Attach(req, std::move(async_response));
297 }
298 }
299
GetTraceStats()300 void ConsumerIPCClientImpl::GetTraceStats() {
301 if (!connected_) {
302 PERFETTO_DLOG("Cannot GetTraceStats(), not connected to tracing service");
303 return;
304 }
305
306 protos::gen::GetTraceStatsRequest req;
307 ipc::Deferred<protos::gen::GetTraceStatsResponse> async_response;
308
309 // The IPC layer guarantees that callbacks are destroyed after this object
310 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
311 // contract of this class expects the caller to not destroy the Consumer class
312 // before having destroyed this class. Hence binding |this| here is safe.
313 async_response.Bind(
314 [this](ipc::AsyncResult<protos::gen::GetTraceStatsResponse> response) {
315 if (!response) {
316 consumer_->OnTraceStats(/*success=*/false, TraceStats());
317 return;
318 }
319 consumer_->OnTraceStats(/*success=*/true, response->trace_stats());
320 });
321 consumer_port_.GetTraceStats(req, std::move(async_response));
322 }
323
ObserveEvents(uint32_t enabled_event_types)324 void ConsumerIPCClientImpl::ObserveEvents(uint32_t enabled_event_types) {
325 if (!connected_) {
326 PERFETTO_DLOG("Cannot ObserveEvents(), not connected to tracing service");
327 return;
328 }
329
330 protos::gen::ObserveEventsRequest req;
331 for (uint32_t i = 0; i < 32; i++) {
332 const uint32_t event_id = 1u << i;
333 if (enabled_event_types & event_id)
334 req.add_events_to_observe(static_cast<ObservableEvents::Type>(event_id));
335 }
336
337 ipc::Deferred<protos::gen::ObserveEventsResponse> async_response;
338 // The IPC layer guarantees that callbacks are destroyed after this object
339 // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
340 // contract of this class expects the caller to not destroy the Consumer class
341 // before having destroyed this class. Hence binding |this| here is safe.
342 async_response.Bind(
343 [this](ipc::AsyncResult<protos::gen::ObserveEventsResponse> response) {
344 // Skip empty response, which the service sends to close the stream.
345 if (!response.has_more()) {
346 PERFETTO_DCHECK(!response.success());
347 return;
348 }
349 consumer_->OnObservableEvents(response->events());
350 });
351 consumer_port_.ObserveEvents(req, std::move(async_response));
352 }
353
QueryServiceState(QueryServiceStateCallback callback)354 void ConsumerIPCClientImpl::QueryServiceState(
355 QueryServiceStateCallback callback) {
356 if (!connected_) {
357 PERFETTO_DLOG(
358 "Cannot QueryServiceState(), not connected to tracing service");
359 return;
360 }
361
362 auto it = pending_query_svc_reqs_.insert(pending_query_svc_reqs_.end(),
363 {std::move(callback), {}});
364 protos::gen::QueryServiceStateRequest req;
365 ipc::Deferred<protos::gen::QueryServiceStateResponse> async_response;
366 auto weak_this = weak_ptr_factory_.GetWeakPtr();
367 async_response.Bind(
368 [weak_this,
369 it](ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response) {
370 if (weak_this)
371 weak_this->OnQueryServiceStateResponse(std::move(response), it);
372 });
373 consumer_port_.QueryServiceState(req, std::move(async_response));
374 }
375
OnQueryServiceStateResponse(ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,PendingQueryServiceRequests::iterator req_it)376 void ConsumerIPCClientImpl::OnQueryServiceStateResponse(
377 ipc::AsyncResult<protos::gen::QueryServiceStateResponse> response,
378 PendingQueryServiceRequests::iterator req_it) {
379 PERFETTO_DCHECK(req_it->callback);
380
381 if (!response) {
382 auto callback = std::move(req_it->callback);
383 pending_query_svc_reqs_.erase(req_it);
384 callback(false, TracingServiceState());
385 return;
386 }
387
388 // The QueryServiceState response can be split in several chunks if the
389 // service has several data sources. The client is supposed to merge all the
390 // replies. The easiest way to achieve this is to re-serialize the partial
391 // response and then re-decode the merged result in one shot.
392 std::vector<uint8_t>& merged_resp = req_it->merged_resp;
393 std::vector<uint8_t> part = response->service_state().SerializeAsArray();
394 merged_resp.insert(merged_resp.end(), part.begin(), part.end());
395
396 if (response.has_more())
397 return;
398
399 // All replies have been received. Decode the merged result and reply to the
400 // callback.
401 protos::gen::TracingServiceState svc_state;
402 bool ok = svc_state.ParseFromArray(merged_resp.data(), merged_resp.size());
403 if (!ok)
404 PERFETTO_ELOG("Failed to decode merged QueryServiceStateResponse");
405 auto callback = std::move(req_it->callback);
406 pending_query_svc_reqs_.erase(req_it);
407 callback(ok, std::move(svc_state));
408 }
409
QueryCapabilities(QueryCapabilitiesCallback callback)410 void ConsumerIPCClientImpl::QueryCapabilities(
411 QueryCapabilitiesCallback callback) {
412 if (!connected_) {
413 PERFETTO_DLOG(
414 "Cannot QueryCapabilities(), not connected to tracing service");
415 return;
416 }
417
418 protos::gen::QueryCapabilitiesRequest req;
419 ipc::Deferred<protos::gen::QueryCapabilitiesResponse> async_response;
420 async_response.Bind(
421 [callback](
422 ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse> response) {
423 if (!response) {
424 // If the IPC fails, we are talking to an older version of the service
425 // that didn't support QueryCapabilities at all. In this case return
426 // an empty capabilities message.
427 callback(TracingServiceCapabilities());
428 } else {
429 callback(response->capabilities());
430 }
431 });
432 consumer_port_.QueryCapabilities(req, std::move(async_response));
433 }
434
SaveTraceForBugreport(SaveTraceForBugreportCallback callback)435 void ConsumerIPCClientImpl::SaveTraceForBugreport(
436 SaveTraceForBugreportCallback callback) {
437 if (!connected_) {
438 PERFETTO_DLOG(
439 "Cannot SaveTraceForBugreport(), not connected to tracing service");
440 return;
441 }
442
443 protos::gen::SaveTraceForBugreportRequest req;
444 ipc::Deferred<protos::gen::SaveTraceForBugreportResponse> async_response;
445 async_response.Bind(
446 [callback](ipc::AsyncResult<protos::gen::SaveTraceForBugreportResponse>
447 response) {
448 if (!response) {
449 // If the IPC fails, we are talking to an older version of the service
450 // that didn't support SaveTraceForBugreport at all.
451 callback(
452 false,
453 "The tracing service doesn't support SaveTraceForBugreport()");
454 } else {
455 callback(response->success(), response->msg());
456 }
457 });
458 consumer_port_.SaveTraceForBugreport(req, std::move(async_response));
459 }
460
461 } // namespace perfetto
462