1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/public/consumer_api.h"
18 
19 #include <fcntl.h>
20 #include <inttypes.h>
21 #include <stdlib.h>
22 #include <sys/mman.h>
23 #include <sys/select.h>
24 #include <sys/stat.h>
25 #include <sys/time.h>
26 #include <sys/uio.h>
27 #include <unistd.h>
28 
29 #include <atomic>
30 #include <condition_variable>
31 #include <memory>
32 #include <mutex>
33 #include <thread>
34 
35 #include "perfetto/base/build_config.h"
36 #include "perfetto/ext/base/scoped_file.h"
37 #include "perfetto/ext/base/temp_file.h"
38 #include "perfetto/ext/base/thread_checker.h"
39 #include "perfetto/ext/base/unix_task_runner.h"
40 #include "perfetto/ext/base/utils.h"
41 #include "perfetto/ext/tracing/core/consumer.h"
42 #include "perfetto/ext/tracing/core/trace_packet.h"
43 #include "perfetto/ext/tracing/ipc/consumer_ipc_client.h"
44 #include "perfetto/ext/tracing/ipc/default_socket.h"
45 #include "perfetto/tracing/core/trace_config.h"
46 
47 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
48 #include <linux/memfd.h>
49 #include <sys/syscall.h>
50 #endif
51 
52 #include "protos/perfetto/config/trace_config.gen.h"
53 
54 #define PERFETTO_EXPORTED_API __attribute__((visibility("default")))
55 
56 namespace perfetto {
57 namespace consumer {
58 
59 namespace {
60 
61 class TracingSession : public Consumer {
62  public:
63   TracingSession(base::TaskRunner*,
64                  Handle,
65                  OnStateChangedCb,
66                  void* callback_arg,
67                  const TraceConfig&);
68   ~TracingSession() override;
69 
70   // Note: if making this class moveable, the move-ctor/dtor must be updated
71   // to clear up mapped_buf_ on dtor.
72 
73   // These methods are called on a thread != |task_runner_|.
state() const74   State state() const { return state_; }
mapped_buf() const75   std::pair<char*, size_t> mapped_buf() const {
76     // The comparison operator will do an acquire-load on the atomic |state_|.
77     if (state_ == State::kTraceEnded)
78       return std::make_pair(mapped_buf_, mapped_buf_size_);
79     return std::make_pair(nullptr, 0);
80   }
81 
82   // All the methods below are called only on the |task_runner_| thread.
83 
84   bool Initialize();
85   void StartTracing();
86 
87   // perfetto::Consumer implementation.
88   void OnConnect() override;
89   void OnDisconnect() override;
90   void OnTracingDisabled(const std::string& error) override;
91   void OnTraceData(std::vector<TracePacket>, bool has_more) override;
92   void OnDetach(bool) override;
93   void OnAttach(bool, const TraceConfig&) override;
94   void OnTraceStats(bool, const TraceStats&) override;
95   void OnObservableEvents(const ObservableEvents&) override;
96 
97  private:
98   TracingSession(const TracingSession&) = delete;
99   TracingSession& operator=(const TracingSession&) = delete;
100 
101   void DestroyConnection();
102   void NotifyCallback();
103 
104   base::TaskRunner* const task_runner_;
105   Handle const handle_;
106   OnStateChangedCb const callback_ = nullptr;
107   void* const callback_arg_ = nullptr;
108   TraceConfig trace_config_;
109   base::ScopedFile buf_fd_;
110   std::unique_ptr<TracingService::ConsumerEndpoint> consumer_endpoint_;
111 
112   // |mapped_buf_| and |mapped_buf_size_| are seq-consistent with |state_|.
113   std::atomic<State> state_{State::kIdle};
114   char* mapped_buf_ = nullptr;
115   size_t mapped_buf_size_ = 0;
116 
117   PERFETTO_THREAD_CHECKER(thread_checker_)
118 };
119 
TracingSession(base::TaskRunner * task_runner,Handle handle,OnStateChangedCb callback,void * callback_arg,const TraceConfig & trace_config_proto)120 TracingSession::TracingSession(base::TaskRunner* task_runner,
121                                Handle handle,
122                                OnStateChangedCb callback,
123                                void* callback_arg,
124                                const TraceConfig& trace_config_proto)
125     : task_runner_(task_runner),
126       handle_(handle),
127       callback_(callback),
128       callback_arg_(callback_arg) {
129   PERFETTO_DETACH_FROM_THREAD(thread_checker_);
130   trace_config_ = trace_config_proto;
131   trace_config_.set_write_into_file(true);
132 
133   // TODO(primiano): this really doesn't matter because the trace will be
134   // flushed into the file when stopping. We need a way to be able to say
135   // "disable periodic flushing and flush only when stopping".
136   trace_config_.set_file_write_period_ms(60000);
137 }
138 
~TracingSession()139 TracingSession::~TracingSession() {
140   PERFETTO_DCHECK_THREAD(thread_checker_);
141   if (mapped_buf_)
142     PERFETTO_CHECK(munmap(mapped_buf_, mapped_buf_size_) == 0);
143 }
144 
Initialize()145 bool TracingSession::Initialize() {
146   PERFETTO_DCHECK_THREAD(thread_checker_);
147 
148   if (state_ != State::kIdle)
149     return false;
150 
151 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
152   char memfd_name[64];
153   snprintf(memfd_name, sizeof(memfd_name), "perfetto_trace_%" PRId64, handle_);
154   buf_fd_.reset(
155       static_cast<int>(syscall(__NR_memfd_create, memfd_name, MFD_CLOEXEC)));
156 #else
157   // Fallback for testing on Linux/mac.
158   buf_fd_ = base::TempFile::CreateUnlinked().ReleaseFD();
159 #endif
160 
161   if (!buf_fd_) {
162     PERFETTO_PLOG("Failed to allocate temporary tracing buffer");
163     return false;
164   }
165 
166   state_ = State::kConnecting;
167   consumer_endpoint_ =
168       ConsumerIPCClient::Connect(GetConsumerSocket(), this, task_runner_);
169 
170   return true;
171 }
172 
173 // Called after EnabledTracing, soon after the IPC connection is established.
OnConnect()174 void TracingSession::OnConnect() {
175   PERFETTO_DCHECK_THREAD(thread_checker_);
176 
177   PERFETTO_DLOG("OnConnect");
178   PERFETTO_DCHECK(state_ == State::kConnecting);
179   consumer_endpoint_->EnableTracing(trace_config_,
180                                     base::ScopedFile(dup(*buf_fd_)));
181   if (trace_config_.deferred_start())
182     state_ = State::kConfigured;
183   else
184     state_ = State::kTracing;
185   NotifyCallback();
186 }
187 
StartTracing()188 void TracingSession::StartTracing() {
189   PERFETTO_DCHECK_THREAD(thread_checker_);
190 
191   auto state = state_.load();
192   if (state != State::kConfigured) {
193     PERFETTO_ELOG("StartTracing(): invalid state (%d)",
194                   static_cast<int>(state));
195     return;
196   }
197   state_ = State::kTracing;
198   consumer_endpoint_->StartTracing();
199 }
200 
OnTracingDisabled(const std::string & error)201 void TracingSession::OnTracingDisabled(const std::string& error) {
202   PERFETTO_DCHECK_THREAD(thread_checker_);
203   PERFETTO_DLOG("OnTracingDisabled %s", error.c_str());
204 
205   struct stat stat_buf {};
206   int res = fstat(buf_fd_.get(), &stat_buf);
207   mapped_buf_size_ = res == 0 ? static_cast<size_t>(stat_buf.st_size) : 0;
208   mapped_buf_ =
209       static_cast<char*>(mmap(nullptr, mapped_buf_size_, PROT_READ | PROT_WRITE,
210                               MAP_SHARED, buf_fd_.get(), 0));
211   DestroyConnection();
212   if (mapped_buf_size_ == 0 || mapped_buf_ == MAP_FAILED) {
213     mapped_buf_ = nullptr;
214     mapped_buf_size_ = 0;
215     state_ = State::kTraceFailed;
216     PERFETTO_ELOG("Tracing session failed");
217   } else {
218     state_ = State::kTraceEnded;
219   }
220   NotifyCallback();
221 }
222 
OnDisconnect()223 void TracingSession::OnDisconnect() {
224   PERFETTO_DCHECK_THREAD(thread_checker_);
225   PERFETTO_DLOG("OnDisconnect");
226   DestroyConnection();
227   state_ = State::kConnectionError;
228   NotifyCallback();
229 }
230 
OnDetach(bool)231 void TracingSession::OnDetach(bool) {
232   PERFETTO_DCHECK(false);  // Should never be called, Detach() is not used here.
233 }
234 
OnAttach(bool,const TraceConfig &)235 void TracingSession::OnAttach(bool, const TraceConfig&) {
236   PERFETTO_DCHECK(false);  // Should never be called, Attach() is not used here.
237 }
238 
OnTraceStats(bool,const TraceStats &)239 void TracingSession::OnTraceStats(bool, const TraceStats&) {
240   // Should never be called, GetTraceStats() is not used here.
241   PERFETTO_DCHECK(false);
242 }
243 
OnObservableEvents(const ObservableEvents &)244 void TracingSession::OnObservableEvents(const ObservableEvents&) {
245   // Should never be called, ObserveEvents() is not used here.
246   PERFETTO_DCHECK(false);
247 }
248 
DestroyConnection()249 void TracingSession::DestroyConnection() {
250   // Destroys the connection in a separate task. This is to avoid destroying
251   // the IPC connection directly from within the IPC callback.
252   TracingService::ConsumerEndpoint* endpoint = consumer_endpoint_.release();
253   task_runner_->PostTask([endpoint] { delete endpoint; });
254 }
255 
OnTraceData(std::vector<TracePacket>,bool)256 void TracingSession::OnTraceData(std::vector<TracePacket>, bool) {
257   // This should be never called because we are using |write_into_file| and
258   // asking the traced service to directly write into the |buf_fd_|.
259   PERFETTO_DFATAL("Should be unreachable.");
260 }
261 
NotifyCallback()262 void TracingSession::NotifyCallback() {
263   if (!callback_)
264     return;
265   auto state = state_.load();
266   auto callback = callback_;
267   auto handle = handle_;
268   auto callback_arg = callback_arg_;
269   task_runner_->PostTask([callback, callback_arg, handle, state] {
270     callback(handle, state, callback_arg);
271   });
272 }
273 
274 class TracingController {
275  public:
276   static TracingController* GetInstance();
277   TracingController();
278 
279   // These methods are called from a thread != |task_runner_|.
280   Handle Create(const void*, size_t, OnStateChangedCb, void* callback_arg);
281   void StartTracing(Handle);
282   State PollState(Handle);
283   TraceBuffer ReadTrace(Handle);
284   void Destroy(Handle);
285 
286  private:
287   void ThreadMain();  // Called on |task_runner_| thread.
288 
289   std::mutex mutex_;
290   std::unique_ptr<base::UnixTaskRunner> task_runner_;
291   std::condition_variable task_runner_initialized_;
292   Handle last_handle_ = 0;
293   std::map<Handle, std::unique_ptr<TracingSession>> sessions_;
294   std::thread thread_;  // Keep last.
295 };
296 
GetInstance()297 TracingController* TracingController::GetInstance() {
298   static TracingController* instance = new TracingController();
299   return instance;
300 }
301 
TracingController()302 TracingController::TracingController() {
303   std::unique_lock<std::mutex> lock(mutex_);
304   thread_ = std::thread(&TracingController::ThreadMain, this);
305   task_runner_initialized_.wait(lock, [this] { return !!task_runner_; });
306 }
307 
ThreadMain()308 void TracingController::ThreadMain() {
309   {
310     std::unique_lock<std::mutex> lock(mutex_);
311     task_runner_.reset(new base::UnixTaskRunner());
312   }
313   task_runner_initialized_.notify_one();
314   task_runner_->Run();
315 }
316 
Create(const void * config_proto_buf,size_t config_len,OnStateChangedCb callback,void * callback_arg)317 Handle TracingController::Create(const void* config_proto_buf,
318                                  size_t config_len,
319                                  OnStateChangedCb callback,
320                                  void* callback_arg) {
321   TraceConfig config_proto;
322   bool parsed = config_proto.ParseFromArray(config_proto_buf, config_len);
323   if (!parsed) {
324     PERFETTO_ELOG("Failed to decode TraceConfig proto");
325     return kInvalidHandle;
326   }
327 
328   if (!config_proto.duration_ms()) {
329     PERFETTO_ELOG("The trace config must specify a duration");
330     return kInvalidHandle;
331   }
332 
333   std::unique_lock<std::mutex> lock(mutex_);
334   Handle handle = ++last_handle_;
335   auto* session = new TracingSession(task_runner_.get(), handle, callback,
336                                      callback_arg, config_proto);
337   sessions_.emplace(handle, std::unique_ptr<TracingSession>(session));
338 
339   // Enable the TracingSession on its own thread.
340   task_runner_->PostTask([session] { session->Initialize(); });
341 
342   return handle;
343 }
344 
StartTracing(Handle handle)345 void TracingController::StartTracing(Handle handle) {
346   std::unique_lock<std::mutex> lock(mutex_);
347   auto it = sessions_.find(handle);
348   if (it == sessions_.end()) {
349     PERFETTO_ELOG("StartTracing(): Invalid tracing session handle");
350     return;
351   }
352   TracingSession* session = it->second.get();
353   task_runner_->PostTask([session] { session->StartTracing(); });
354 }
355 
PollState(Handle handle)356 State TracingController::PollState(Handle handle) {
357   std::unique_lock<std::mutex> lock(mutex_);
358   auto it = sessions_.find(handle);
359   if (it == sessions_.end())
360     return State::kSessionNotFound;
361   return it->second->state();
362 }
363 
ReadTrace(Handle handle)364 TraceBuffer TracingController::ReadTrace(Handle handle) {
365   TraceBuffer buf{};
366 
367   std::unique_lock<std::mutex> lock(mutex_);
368   auto it = sessions_.find(handle);
369   if (it == sessions_.end()) {
370     PERFETTO_DLOG("Handle invalid");
371     return buf;
372   }
373 
374   TracingSession* session = it->second.get();
375   auto state = session->state();
376   if (state == State::kTraceEnded) {
377     std::tie(buf.begin, buf.size) = session->mapped_buf();
378     return buf;
379   }
380 
381   PERFETTO_DLOG("ReadTrace(): called in an unexpected state (%d)",
382                 static_cast<int>(state));
383   return buf;
384 }
385 
Destroy(Handle handle)386 void TracingController::Destroy(Handle handle) {
387   // Post an empty task on the task runner to delete the session on its own
388   // thread.
389   std::unique_lock<std::mutex> lock(mutex_);
390   auto it = sessions_.find(handle);
391   if (it == sessions_.end())
392     return;
393   TracingSession* session = it->second.release();
394   sessions_.erase(it);
395   task_runner_->PostTask([session] { delete session; });
396 }
397 
398 }  // namespace
399 
Create(const void * config_proto,size_t config_len,OnStateChangedCb callback,void * callback_arg)400 PERFETTO_EXPORTED_API Handle Create(const void* config_proto,
401                                     size_t config_len,
402                                     OnStateChangedCb callback,
403                                     void* callback_arg) {
404   return TracingController::GetInstance()->Create(config_proto, config_len,
405                                                   callback, callback_arg);
406 }
407 
408 PERFETTO_EXPORTED_API
StartTracing(Handle handle)409 void StartTracing(Handle handle) {
410   return TracingController::GetInstance()->StartTracing(handle);
411 }
412 
PollState(Handle handle)413 PERFETTO_EXPORTED_API State PollState(Handle handle) {
414   return TracingController::GetInstance()->PollState(handle);
415 }
416 
ReadTrace(Handle handle)417 PERFETTO_EXPORTED_API TraceBuffer ReadTrace(Handle handle) {
418   return TracingController::GetInstance()->ReadTrace(handle);
419 }
420 
Destroy(Handle handle)421 PERFETTO_EXPORTED_API void Destroy(Handle handle) {
422   TracingController::GetInstance()->Destroy(handle);
423 }
424 }  // namespace consumer
425 }  // namespace perfetto
426