1 // Copyright (C) 2019 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IORAP_SRC_PERFETTO_RX_PRODUCER_H_
16 #define IORAP_SRC_PERFETTO_RX_PRODUCER_H_
17 
18 #include "perfetto/perfetto_consumer.h"       // libiorap
19 
20 #include <perfetto/config/trace_config.pb.h>  // libperfetto
21 #include <rxcpp/rx.hpp>
22 
23 #include <iosfwd>
24 #include <functional>
25 #include <optional>
26 #include <vector>
27 
28 namespace iorap::perfetto {
29 
30 struct PerfettoDependencies {
31   using Component =
32       fruit::Component<PerfettoConsumer, ::perfetto::protos::TraceConfig>;
33   using Injector =
34       fruit::Injector<PerfettoConsumer, ::perfetto::protos::TraceConfig>;
35   using NormalizedComponent =
36       fruit::NormalizedComponent<PerfettoConsumer, ::perfetto::protos::TraceConfig>;
37 
38   // Create a 'live' component that will talk to perfetto via traced.
39   static Component CreateComponent(/*TODO: config params*/);
40 
41   // Create perfetto.protos.TraceConfig , serialized as a (machine-readable) string.
42   //
43   // The following ftrace events are enabled:
44   // * mm_filemap_add_to_page_cache
45   // * mm_filemap_delete_from_page_cache
46   //
47   // If deferred starting is also enabled, no tracing will begin until
48   // ::perfetto::consumer::StartTracing is invoked.
49   static ::perfetto::protos::TraceConfig CreateConfig(uint32_t duration_ms,
50                                                       bool deferred_start = true,
51                                                       uint32_t buffer_size = 4096);
52 };
53 
54 namespace detail {
55   template <typename T>
56   struct concept_message_lite_base {
57     static_assert(std::is_base_of_v<::google::protobuf::MessageLite, T>,
58                   "T must inherit from MessageLite");
59     using type = T;
60   };
61 
62   template <typename T>
63   using concept_message_lite_base_t = typename concept_message_lite_base<T>::type;
64 }  // namespace detail
65 
66 /*
67  * In Android's version of libprotobuf, move-constructors are not generated.
68  * This results in a legitimate (~10sec per TracePacket being compiled) slowdown,
69  * so we need to avoid it everywhere.
70  *
71  * 1) Don't copy the protos, move them instead.
72  * 2) Use 'shared_ptr' because rxcpp won't compile with unique_ptr.
73  */
74 template <typename T>
75 using ProtobufPtr = std::shared_ptr<detail::concept_message_lite_base_t<const T>>;
76 
77 template <typename T>
78 using ProtobufMutablePtr = std::shared_ptr<detail::concept_message_lite_base_t<T>>;
79 
80 // This acts as a lightweight type marker so that we know what data has actually
81 // encoded under the hood.
82 template <typename T>
83 struct BinaryWireProtobuf {
84   static_assert(std::is_base_of_v<::google::protobuf::MessageLite, T>,
85                 "T should be a base class of MessageLite");
86 
dataBinaryWireProtobuf87   std::vector<std::byte>& data() {
88     return data_;
89   }
90 
dataBinaryWireProtobuf91   const std::vector<std::byte>& data() const {
92     return data_;
93   }
94 
sizeBinaryWireProtobuf95   size_t size() const {
96     return data_.size();
97   }
98 
BinaryWireProtobufBinaryWireProtobuf99   explicit BinaryWireProtobuf(char* data, size_t size)
100     : BinaryWireProtobuf(reinterpret_cast<std::byte*>(data), size) {
101   }
102 
BinaryWireProtobufBinaryWireProtobuf103   explicit BinaryWireProtobuf(std::byte* data, size_t size) {
104     data_.resize(size);
105     std::copy(data,
106               data + size,
107               data_.data());
108   }
109 
BinaryWireProtobufBinaryWireProtobuf110   explicit BinaryWireProtobuf(std::vector<std::byte> data) : data_{std::move(data)} {
111   }
112 
113   // You wouldn't want to accidentally copy a giant multi-megabyte chunk would you?
114   // BinaryWireProtobuf(const BinaryWireProtobuf& other) = delete;  // FIXME: rx likes to copy.
115   BinaryWireProtobuf(const BinaryWireProtobuf& other) = default;
116   BinaryWireProtobuf(BinaryWireProtobuf&& other) = default;
117 
118   // Important: Deserialization could fail, for example data is truncated or
119   // some minor disc corruption occurred.
120   template <typename U>
MaybeUnserializeBinaryWireProtobuf121   std::optional<ProtobufPtr<U>> MaybeUnserialize() {
122     ProtobufMutablePtr<U> unencoded{new U{}};
123 
124     if (!unencoded->ParseFromArray(data_.data(), data_.size())) {
125       return std::nullopt;
126     }
127 
128     return {std::move(unencoded)};
129   }
130 
131   bool WriteFullyToFile(const std::string& path,
132                         bool follow_symlinks = false) const;
133 
134   static std::optional<BinaryWireProtobuf<T>> ReadFullyFromFile(const std::string& path,
135                                                                 bool follow_symlinks = false);
136 
137   bool operator==(const BinaryWireProtobuf<T>& other) const;
138   bool operator!=(const BinaryWireProtobuf<T>& other) const {
139     return !(*this == other);
140   }
141 
142  private:
143   static bool CleanUpAfterFailedWrite(const std::string& path);
144   bool WriteStringToFd(int fd) const;
145 
146   static bool ReadFdToString(int fd, /*out*/std::vector<std::byte>* data);
147 
148   std::vector<std::byte> data_;
149 };
150 
151 //using PerfettoTraceProto = BinaryWireProtobuf<::perfetto::protos::Trace>;
152 using PerfettoTraceProto = BinaryWireProtobuf<::google::protobuf::MessageLite>;
153 
154 enum class PerfettoStreamCommand {
155   kStartTracing, // -> () | on_error
156   kStopTracing,  // -> on_next(PerfettoTraceProto) | on_error
157   kShutdown,     // -> on_completed | on_error
158   // XX: should shutdown be converted to use the rx suscriber#unsubscribe instead?
159 };
160 
161 std::ostream& operator<<(std::ostream& os, PerfettoStreamCommand c);
162 
163 struct RxProducerFactory {
164   // Passing anything by value leads to a lot of pain and headache.
165   // Pass in the injector by reference because nothing else seems to work.
166   explicit RxProducerFactory(PerfettoDependencies::Injector& injector);
167 
168   // Create a one-shot perfetto observable that will begin
169   // asynchronously producing a PerfettoTraceProto after the 'kStartTracing'
170   // command is observed.
171   //
172   // libperfetto is immediately primed (i.e. connected in a deferred state)
173   // upon calling this function, to reduce the latency of 'kStartTracing'.
174   //
175   // To finish the trace, push 'kStopTracing'. To cancel or tear down at any
176   // time, push 'kShutdown'.
177   //
178   // The TraceProto may come out at any time after 'kStartTracing',
179   // this is controlled by duration_ms in the TraceConfig.
180   //
181   // TODO: libperfetto should actually stop tracing when we ask it to,
182   // instead of using a hardcoded time.
183   //
184   // The observable may go into #on_error at any time, if the underlying
185   // libperfetto states transition to a failing state.
186   // This usually means the OS is not configured correctly.
187   rxcpp::observable<PerfettoTraceProto> CreateTraceStream(
188       rxcpp::observable<PerfettoStreamCommand> commands);
189 
190   // TODO: is this refactor-able into a subscriber factory that takes
191   // the commands-observable as a parameter?
192 
193   // TODO: infinite perfetto stream.
194 
195  private:
196   // XX: why doesn't this just let me pass in a regular Component?
197   PerfettoDependencies::Injector& injector_;
198 
199   friend void CollectPerfettoTraceBufferImmediately(
200       RxProducerFactory& producer_factory,
201       const std::string& arg_output_proto);
202 };
203 
204 // An rx Coordination, which will cause a new thread to spawn for each new Worker.
205 //
206 // Idle-class priority is set for the CPU and IO priorities on the new thread.
207 //
208 // TODO: move to separate file
209 rxcpp::observe_on_one_worker ObserveOnNewIoThread();
210 
211 }  // namespace iorap::perfetto
212 #endif  // IORAP_SRC_PERFETTO_RX_PRODUCER_H_
213