1 // Copyright (C) 2019 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "compiler/compiler.h"
16 
17 #include "common/debug.h"
18 #include "common/expected.h"
19 
20 #include "perfetto/rx_producer.h"  // TODO: refactor BinaryWireProtobuf to separate header.
21 #include "inode2filename/inode.h"
22 #include "inode2filename/search_directories.h"
23 #include "serialize/protobuf_io.h"
24 
25 #include <android-base/unique_fd.h>
26 #include <android-base/parseint.h>
27 #include <android-base/file.h>
28 
29 #include <perfetto/trace/trace.pb.h>  // ::perfetto::protos::Trace
30 #include <perfetto/trace/trace_packet.pb.h>  // ::perfetto::protos::TracePacket
31 
32 #include "rxcpp/rx.hpp"
33 #include <iostream>
34 #include <fstream>
35 #include <optional>
36 #include <utility>
37 #include <regex>
38 
39 #include <sched.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <syscall.h>
43 #include <fcntl.h>
44 #include <unistd.h>
45 
46 namespace iorap::compiler {
47 
48 using Inode = iorap::inode2filename::Inode;
49 using InodeResult = iorap::inode2filename::InodeResult;
50 using SearchDirectories = iorap::inode2filename::SearchDirectories;
51 
52 template <typename T>
53 using ProtobufPtr = iorap::perfetto::ProtobufPtr<T>;
54 
55 struct PerfettoTraceProtoInfo {
56   /* The perfetto trace proto. */
57   ::iorap::perfetto::PerfettoTraceProto proto;
58   /*
59    * The timestamp limit of the trace.
60    * It's used to truncate the trace file.
61    */
62   uint64_t timestamp_limit_ns;
63   /*
64    * The pid of the app.
65    * If positive, it's used to filter out other page cache events.
66    */
67   int32_t pid;
68 };
69 
70 struct PerfettoTracePtrInfo {
71   /* Deserialized protobuf data containing the perfetto trace. */
72   ProtobufPtr<::perfetto::protos::Trace> trace_ptr;
73   /*
74    * The timestamp limit of the trace.
75    * It's used to truncate the trace file.
76    */
77   uint64_t timestamp_limit_ns;
78   /*
79    * The pid of the app.
80    * If positive, it's used to filter out other page cache events.
81    */
82   int32_t pid;
83 };
84 
85 // Attempt to read protobufs from the filenames.
86 // Emits one (or none) protobuf for each filename, in the same order as the filenames.
87 // On any errors, the items are dropped (errors are written to the error LOG).
88 //
89 // All work is done on the same Coordinator as the Subscriber.
90 template <typename ProtoT /*extends MessageLite*/>
ReadProtosFromFileNames(rxcpp::observable<CompilationInput> file_infos)91 auto/*observable<PerfettoTracePtrInfo>*/ ReadProtosFromFileNames(
92     rxcpp::observable<CompilationInput> file_infos) {
93   using BinaryWireProtoT = ::iorap::perfetto::PerfettoTraceProto;
94 
95   return file_infos
96     .map([](const CompilationInput& file_info) ->
97          std::optional<PerfettoTraceProtoInfo> {
98       LOG(VERBOSE) << "compiler::ReadProtosFromFileNames " << file_info.filename
99                    << " TimeStampLimit "<< file_info.timestamp_limit_ns
100                    << " Pid " << file_info.pid << " [begin]";
101       std::optional<BinaryWireProtoT> maybe_proto =
102           BinaryWireProtoT::ReadFullyFromFile(file_info.filename);
103       if (!maybe_proto) {
104         LOG(ERROR) << "Failed to read file: " << file_info.filename;
105         return std::nullopt;
106       }
107       return {{std::move(maybe_proto.value()), file_info.timestamp_limit_ns, file_info.pid}};
108     })
109     .filter([](const std::optional<PerfettoTraceProtoInfo>& proto_info) {
110       return proto_info.has_value();
111     })
112     .map([](std::optional<PerfettoTraceProtoInfo>& proto_info) ->
113          PerfettoTraceProtoInfo {
114       return proto_info.value();
115     })  // TODO: refactor to something that flattens the optional, and logs in one operator.
116     .map([](PerfettoTraceProtoInfo& proto_info) ->
117          std::optional<PerfettoTracePtrInfo> {
118       std::optional<ProtobufPtr<ProtoT>> t = proto_info.proto.template MaybeUnserialize<ProtoT>();
119       if (!t) {
120         LOG(ERROR) << "Failed to parse protobuf: ";  // TODO: filename.
121         return std::nullopt;
122       }
123       return {{std::move(t.value()), proto_info.timestamp_limit_ns, proto_info.pid}};
124     })
125     .filter([](const std::optional<PerfettoTracePtrInfo>& trace_info) {
126       return trace_info.has_value();
127     })
128     .map([](std::optional<PerfettoTracePtrInfo>& trace_info) ->
129          PerfettoTracePtrInfo {
130       LOG(VERBOSE) << "compiler::ReadProtosFromFileNames [success]";
131       return trace_info.value();
132       // TODO: protobufs have no move constructor. this might be inefficient?
133     });
134 
135 /*
136   return filenames
137     .map([](const std::string& filename) {
138       LOG(VERBOSE) << "compiler::ReadProtosFromFileNames " << filename << " [begin]";
139       std::optional<BinaryWireProtoT> maybe_proto =
140           BinaryWireProtoT::ReadFullyFromFile(filename);
141       if (!maybe_proto) {
142         LOG(ERROR) << "Failed to read file: " << filename;
143       }
144 
145       std::unique_ptr<BinaryWireProtoT> ptr;
146       if (maybe_proto) {
147         ptr.reset(new BinaryWireProtoT{std::move(*maybe_proto)});
148       }
149       return ptr;
150     })
151     .filter([](const std::unique_ptr<BinaryWireProtoT>& proto) {
152       return proto != nullptr;
153     })
154     .map([](std::unique_ptr<BinaryWireProtoT>& proto) {
155       std::optional<ProtoT> t = proto->template MaybeUnserialize<ProtoT>();
156       if (!t) {
157         LOG(ERROR) << "Failed to parse protobuf: ";  // TODO: filename.
158       }
159       return t;
160     })
161     .filter([](const std::optional<ProtoT>& proto) {
162       return proto.has_value();
163     })
164     .map([](std::optional<ProtoT> proto) -> ProtoT {
165       LOG(VERBOSE) << "compiler::ReadProtosFromFileNames [success]";
166       return std::move(proto.value());
167       // TODO: protobufs have no move constructor. this might be inefficient?
168     });
169     */
170 }
171 
ReadPerfettoTraceProtos(std::vector<CompilationInput> file_infos)172 auto/*observable<PerfettoTracePtrInfo>*/ ReadPerfettoTraceProtos(
173     std::vector<CompilationInput> file_infos) {
174   auto filename_obs = rxcpp::observable<>::iterate(std::move(file_infos));
175   rxcpp::observable<PerfettoTracePtrInfo> obs =
176       ReadProtosFromFileNames<::perfetto::protos::Trace>(std::move(filename_obs));
177   return obs;
178 }
179 
180 // A flattened data representation of an MmFileMap*FtraceEvent.
181 // This representation is used for streaming processing.
182 //
183 // Note: Perfetto applies a 'union' over all possible fields on all possible devices
184 // (and uses the max sizeof per-field).
185 //
186 // Since all protobuf fields are optional, fields not present on a particular device are always
187 // null
188 struct PageCacheFtraceEvent {
189   /*
190    * Ftrace buffer-specific
191    */
192   uint32_t cpu;  // e.g. 0-7 for the cpu core number.
193 
194   /*
195    * Ftrace-event general data
196    */
197 
198   // Nanoseconds since an epoch.
199   // Epoch is configurable by writing into trace_clock.
200   // By default this timestamp is CPU local.
201   uint64_t timestamp;
202   // Kernel pid (do not confuse with userspace pid aka tgid)
203   uint32_t pid;
204 
205   // Tagged by our code while parsing the ftraces:
206   uint64_t timestamp_relative;  // timestamp relative to first ftrace within a Trace protobuf.
207   bool add_to_page_cache;  // AddToPageCache=true, DeleteFromPageCache=false.
208 
209   /*
210    * mm_filemap-specific data
211    *
212    * Fields are common:
213    * - MmFilemapAddToPageCacheFtraceEvent
214    * - MmFilemapDeleteFromPageCacheFtraceEvent
215    */
216   uint64_t pfn;    // page frame number (physical) - null on some devices, e.g. marlin
217   uint64_t i_ino;  // inode number (use in conjunction with s_dev)
218   uint64_t index;  // offset into file: this is a multiple of the page size (usually 4096).
219   uint64_t s_dev;  // (dev_t) device number
220   uint64_t page;   // struct page*. - null on some devices, e.g. blueline.
221 
inodeiorap::compiler::PageCacheFtraceEvent222   Inode inode() const {
223     return Inode::FromDeviceAndInode(static_cast<dev_t>(s_dev),
224                                      static_cast<ino_t>(i_ino));
225   }
226 };
227 
operator <<(std::ostream & os,const PageCacheFtraceEvent & e)228 std::ostream& operator<<(std::ostream& os, const PageCacheFtraceEvent& e) {
229   os << "{";
230   os << "cpu:" << e.cpu << ",";
231   os << "timestamp:" << e.timestamp << ",";
232   os << "pid:" << e.pid << ",";
233   os << "timestamp_relative:" << e.timestamp_relative << ",";
234   os << "add_to_page_cache:" << e.add_to_page_cache << ",";
235   os << "pfn:" << e.pfn << ",";
236   os << "i_ino:" << e.i_ino << ",";
237   os << "index:" << e.index << ",";
238   os << "s_dev:" << e.s_dev << ",";
239   os << "page:" << e.page;
240   os << "}";
241 
242   return os;
243 }
244 
245 /*
246  * Gets the start timestamp.
247  *
248  * It is the minimium timestamp.
249  */
GetStartTimestamp(const::perfetto::protos::Trace & trace)250 std::optional<uint64_t> GetStartTimestamp(const ::perfetto::protos::Trace& trace) {
251   std::optional<uint64_t> timestamp_relative_start;
252   // Traverse each timestamp to get the minimium one.
253   for (const ::perfetto::protos::TracePacket& packet : trace.packet()) {
254     if (packet.has_timestamp()) {
255       timestamp_relative_start = timestamp_relative_start?
256           std::min(*timestamp_relative_start, packet.timestamp()) : packet.timestamp();
257     }
258     if (!packet.has_ftrace_events()) {
259       continue;
260     }
261     const ::perfetto::protos::FtraceEventBundle& ftrace_event_bundle =
262         packet.ftrace_events();
263     for (const ::perfetto::protos::FtraceEvent& event : ftrace_event_bundle.event()) {
264       if (event.has_timestamp()) {
265         timestamp_relative_start = timestamp_relative_start?
266             std::min(*timestamp_relative_start, event.timestamp()) : event.timestamp();
267       }
268     }
269   }
270   return timestamp_relative_start;
271 }
272 
273 /*
274  * sample blueline output:
275  *
276  * $ adb shell cat /d/tracing/events/filemap/mm_filemap_add_to_page_cache/format
277  *
278  * name: mm_filemap_add_to_page_cache
279  * ID: 178
280  * format:
281  * 	field:unsigned short common_type;	offset:0;	size:2;	signed:0;
282  * 	field:unsigned char common_flags;	offset:2;	size:1;	signed:0;
283  * 	field:unsigned char common_preempt_count;	offset:3;	size:1;	signed:0;
284  * 	field:int common_pid;	offset:4;	size:4;	signed:1;
285  *
286  * 	field:unsigned long pfn;	offset:8;	size:8;	signed:0;
287  * 	field:unsigned long i_ino;	offset:16;	size:8;	signed:0;
288  * 	field:unsigned long index;	offset:24;	size:8;	signed:0;
289  * 	field:dev_t s_dev;	offset:32;	size:4;	signed:0;
290  *
291  * print fmt: "dev %d:%d ino %lx page=%p pfn=%lu ofs=%lu", ((unsigned int) ((REC->s_dev) >> 20)),
292  *            ((unsigned int) ((REC->s_dev) & ((1U << 20) - 1))), REC->i_ino,
293  *             (((struct page *)(((0xffffffffffffffffUL) - ((1UL) << ((39) - 1)) + 1) -
294  *                 ((1UL) << ((39) - 12 - 1 + 6))) - (memstart_addr >> 12)) + (REC->pfn)),
295  *            REC->pfn, REC->index << 12
296  */
297 
SelectPageCacheFtraceEvents(PerfettoTracePtrInfo trace_info)298 auto /*observable<PageCacheFtraceEvent>*/ SelectPageCacheFtraceEvents(
299     PerfettoTracePtrInfo trace_info) {
300   const ::perfetto::protos::Trace& trace = *(trace_info.trace_ptr);
301 
302   constexpr bool kDebugFunction = true;
303 
304   return rxcpp::observable<>::create<PageCacheFtraceEvent>(
305       [trace=std::move(trace),
306       timestamp_limit_ns=trace_info.timestamp_limit_ns,
307       app_pid=trace_info.pid]
308       (rxcpp::subscriber<PageCacheFtraceEvent> sub) {
309     uint64_t timestamp = 0;
310     uint64_t timestamp_relative = 0;
311 
312     std::optional<uint64_t> timestamp_relative_start = GetStartTimestamp(trace);
313     uint32_t cpu = 0;
314     uint32_t pid = 0;
315     bool add_to_page_cache = true;
316 
317     auto on_next_page_cache_event = [&](const auto& mm_event) {
318       PageCacheFtraceEvent out;
319       out.timestamp = timestamp;
320       out.cpu = cpu;
321       out.pid = pid;
322 
323       out.timestamp_relative = timestamp_relative;
324       out.add_to_page_cache = add_to_page_cache;
325 
326       out.pfn = mm_event.pfn();
327       out.i_ino = mm_event.i_ino();
328       out.index = mm_event.index();
329       out.s_dev = mm_event.s_dev();
330       out.page = mm_event.page();
331 
332       sub.on_next(std::move(out));
333     };
334 
335     for (const ::perfetto::protos::TracePacket& packet : trace.packet()) {
336       // Break out of all loops if we are unsubscribed.
337       if (!sub.is_subscribed()) {
338         if (kDebugFunction) LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents unsubscribe";
339         return;
340       }
341 
342       if (kDebugFunction) LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents TracePacket";
343 
344       if (packet.has_timestamp()) {
345         timestamp_relative_start = timestamp_relative_start.value_or(packet.timestamp());
346         timestamp = packet.timestamp();  // XX: should we call 'has_timestamp()' ?
347       } else {
348         timestamp = 0;
349       }
350 
351       if (packet.has_ftrace_events()) {
352         const ::perfetto::protos::FtraceEventBundle& ftrace_event_bundle =
353             packet.ftrace_events();
354 
355         cpu = ftrace_event_bundle.cpu();  // XX: has_cpu ?
356 
357         for (const ::perfetto::protos::FtraceEvent& event : ftrace_event_bundle.event()) {
358           // Break out of all loops if we are unsubscribed.
359           if (!sub.is_subscribed()) {
360             return;
361           }
362 
363           if (app_pid >= 0 &&
364               (!event.has_pid() ||
365                event.pid() != static_cast<uint32_t>(app_pid))) {
366             continue;
367           }
368 
369           if (event.has_timestamp()) {
370             timestamp = event.timestamp();
371             if(timestamp > timestamp_limit_ns) {
372               LOG(VERBOSE) << "The timestamp is " << timestamp <<
373                            ", which exceeds the limit "<< timestamp_limit_ns;
374               continue;
375             }
376           } else {
377             DCHECK(packet.has_timestamp() == false)
378                 << "Timestamp in outer packet but not inner packet";
379             // XX: use timestamp from the perfetto TracePacket ???
380             // REVIEWERS: not sure if this is ok, does it use the same clock source and
381             // is the packet data going to be the same clock sample as the Ftrace event?
382           }
383 
384           if (timestamp_relative_start){
385             timestamp_relative = timestamp - *timestamp_relative_start;
386           } else {
387             timestamp_relative = 0;
388           }
389 
390           pid = event.pid();  // XX: has_pid ?
391 
392           if (event.has_mm_filemap_add_to_page_cache()) {
393             add_to_page_cache = true;
394 
395             const ::perfetto::protos::MmFilemapAddToPageCacheFtraceEvent& mm_event =
396                 event.mm_filemap_add_to_page_cache();
397 
398             on_next_page_cache_event(mm_event);
399           } else if (event.has_mm_filemap_delete_from_page_cache()) {
400             add_to_page_cache = false;
401 
402             const ::perfetto::protos::MmFilemapDeleteFromPageCacheFtraceEvent& mm_event =
403                 event.mm_filemap_delete_from_page_cache();
404 
405             on_next_page_cache_event(mm_event);
406           }
407         }
408       } else {
409         if (kDebugFunction) {
410           LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents no ftrace event bundle";
411         }
412       }
413     }
414 
415     if (kDebugFunction) {
416       LOG(VERBOSE) << "compiler::SelectPageCacheFtraceEvents#on_completed";
417     }
418 
419     // Let subscriber know there are no more items.
420     sub.on_completed();
421   });
422 }
423 
SelectDistinctInodesFromTraces(rxcpp::observable<PerfettoTracePtrInfo> traces)424 auto /*observable<Inode*/ SelectDistinctInodesFromTraces(
425     rxcpp::observable<PerfettoTracePtrInfo> traces) {
426   // Emit only unique (s_dev, i_ino) pairs from all Trace protos.
427   auto obs = traces
428     .flat_map([](PerfettoTracePtrInfo trace) {
429       rxcpp::observable<PageCacheFtraceEvent> obs = SelectPageCacheFtraceEvents(std::move(trace));
430       // FIXME: dont check this in
431       // return obs;
432       //return obs.take(100);   // for faster development
433       return obs;
434     })  // TODO: Upstream bug? using []()::perfetto::protos::Trace&) causes a compilation error.
435     .map([](const PageCacheFtraceEvent& event) -> Inode {
436       return Inode::FromDeviceAndInode(static_cast<dev_t>(event.s_dev),
437                                        static_cast<ino_t>(event.i_ino));
438     })
439     .tap([](const Inode& inode) {
440       LOG(VERBOSE) << "SelectDistinctInodesFromTraces (pre-distinct): " << inode;
441     })
442     .distinct()  // observable<Inode>*/
443     ;
444 
445   return obs;
446 }
447 // TODO: static assert checks for convertible return values.
448 
ResolveInodesToFileNames(rxcpp::observable<Inode> inodes,inode2filename::InodeResolverDependencies dependencies)449 auto/*observable<InodeResult>*/ ResolveInodesToFileNames(
450     rxcpp::observable<Inode> inodes,
451     inode2filename::InodeResolverDependencies dependencies) {
452   std::shared_ptr<inode2filename::InodeResolver> inode_resolver =
453       inode2filename::InodeResolver::Create(std::move(dependencies));
454   return inode_resolver->FindFilenamesFromInodes(std::move(inodes));
455 }
456 
457 using InodeMap = std::unordered_map<Inode, std::string /*filename*/>;
ReduceResolvedInodesToMap(rxcpp::observable<InodeResult> inode_results)458 auto /*just observable<InodeMap>*/ ReduceResolvedInodesToMap(
459       rxcpp::observable<InodeResult> inode_results) {
460   return inode_results.reduce(
461     InodeMap{},
462     [](InodeMap m, InodeResult result) {
463       if (result) {
464         LOG(VERBOSE) << "compiler::ReduceResolvedInodesToMap insert " << result;
465         m.insert({std::move(result.inode), std::move(result.data.value())});
466       } else {
467         // TODO: side stats for how many of these are failed to resolve?
468         LOG(WARNING) << "compiler: Failed to resolve inode, " << result;
469       }
470       return m;
471     },
472     [](InodeMap m) {
473       return m;  // TODO: use an identity function
474     }); // emits exactly 1 InodeMap value.
475 }
476 
477 struct ResolvedPageCacheFtraceEvent {
478   std::string filename;
479   PageCacheFtraceEvent event;
480 };
481 
operator <<(std::ostream & os,const ResolvedPageCacheFtraceEvent & e)482 std::ostream& operator<<(std::ostream& os, const ResolvedPageCacheFtraceEvent& e) {
483   os << "{";
484   os << "filename:\"" << e.filename << "\",";
485   os << e.event;
486   os << "}";
487 
488   return os;
489 }
490 
491 struct CombinedState {
492   CombinedState() = default;
CombinedStateiorap::compiler::CombinedState493   explicit CombinedState(InodeMap inode_map) : inode_map{std::move(inode_map)} {}
CombinedStateiorap::compiler::CombinedState494   explicit CombinedState(PageCacheFtraceEvent event) : ftrace_event{std::move(event)} {}
495 
CombinedStateiorap::compiler::CombinedState496   CombinedState(InodeMap inode_map, PageCacheFtraceEvent event)
497     : inode_map(std::move(inode_map)),
498       ftrace_event{std::move(event)} {}
499 
500   std::optional<InodeMap> inode_map;
501   std::optional<PageCacheFtraceEvent> ftrace_event;
502 
HasAlliorap::compiler::CombinedState503   bool HasAll() const {
504     return inode_map.has_value() && ftrace_event.has_value();
505   }
506 
GetInodeMapiorap::compiler::CombinedState507   const InodeMap& GetInodeMap() const {
508     DCHECK(HasAll());
509     return inode_map.value();
510   }
511 
GetInodeMapiorap::compiler::CombinedState512   InodeMap& GetInodeMap() {
513     DCHECK(HasAll());
514     return inode_map.value();
515   }
516 
GetEventiorap::compiler::CombinedState517   const PageCacheFtraceEvent& GetEvent() const {
518     DCHECK(HasAll());
519     return ftrace_event.value();
520   }
521 
GetEventiorap::compiler::CombinedState522   PageCacheFtraceEvent& GetEvent() {
523     DCHECK(HasAll());
524     return ftrace_event.value();
525   }
526 
Mergeiorap::compiler::CombinedState527   void Merge(CombinedState&& other) {
528     if (other.inode_map) {
529       inode_map = std::move(other.inode_map);
530     }
531     if (other.ftrace_event) {
532       ftrace_event = std::move(other.ftrace_event);
533     }
534   }
535 };
536 
operator <<(std::ostream & os,const CombinedState & s)537 std::ostream& operator<<(std::ostream& os, const CombinedState& s) {
538   os << "CombinedState{inode_map:";
539   if (s.inode_map) {
540     os << "|sz=" << (s.inode_map.value().size()) << "|";
541   } else {
542     os << "(null)";
543   }
544   os << ",event:";
545   if (s.ftrace_event) {
546     //os << s.ftrace_event.value().timestamp << "ns";
547     os << s.ftrace_event.value();
548   } else {
549     os << "(null)";
550   }
551   os << "}";
552   return os;
553 }
554 
ResolvePageCacheEntriesFromProtos(rxcpp::observable<PerfettoTracePtrInfo> traces,inode2filename::InodeResolverDependencies dependencies)555 auto/*observable<ResolvedPageCacheFtraceEvent>*/ ResolvePageCacheEntriesFromProtos(
556     rxcpp::observable<PerfettoTracePtrInfo> traces,
557     inode2filename::InodeResolverDependencies dependencies) {
558 
559   // 1st chain = emits exactly 1 InodeMap.
560 
561   // [proto, proto, proto...] -> [inode, inode, inode, ...]
562   auto/*observable<Inode>*/ distinct_inodes = SelectDistinctInodesFromTraces(traces);
563   rxcpp::observable<Inode> distinct_inodes_obs = distinct_inodes.as_dynamic();
564   // [inode, inode, inode, ...] -> [(inode, {filename|error}), ...]
565   auto/*observable<InodeResult>*/ inode_names = ResolveInodesToFileNames(distinct_inodes_obs,
566                                                                          std::move(dependencies));
567   // rxcpp has no 'join' operators, so do a manual join with concat.
568   auto/*observable<InodeMap>*/ inode_name_map = ReduceResolvedInodesToMap(inode_names);
569 
570   // 2nd chain = emits all PageCacheFtraceEvent
571   auto/*observable<PageCacheFtraceEvent>*/ page_cache_ftrace_events = traces
572     .flat_map([](PerfettoTracePtrInfo trace) {
573       rxcpp::observable<PageCacheFtraceEvent> obs = SelectPageCacheFtraceEvents(std::move(trace));
574       return obs;
575     });
576 
577   auto inode_name_map_precombine = inode_name_map
578     .map([](InodeMap inode_map) {
579       LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos#inode_name_map_precombine ";
580       return CombinedState{std::move(inode_map)};
581     });
582 
583   auto page_cache_ftrace_events_precombine = page_cache_ftrace_events
584     .map([](PageCacheFtraceEvent event) {
585       LOG(VERBOSE)
586           << "compiler::ResolvePageCacheEntriesFromProtos#page_cache_ftrace_events_precombine "
587           << event;
588       return CombinedState{std::move(event)};
589     });
590 
591   // Combine 1st+2nd chain.
592   //
593   // concat subscribes to each observable, waiting until its completed, before subscribing
594   // to the next observable and waiting again.
595   //
596   // During all this, every #on_next is immediately forwarded to the downstream observables.
597   // In our case, we want to block until InodeNameMap is ready, and re-iterate all ftrace events.
598   auto/*observable<ResolvedPageCacheFtraceEvent>*/ resolved_events = inode_name_map_precombine
599     .concat(page_cache_ftrace_events_precombine)
600     .scan(CombinedState{},
601           [](CombinedState current_state, CombinedState delta_state) {
602             LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos#scan "
603                           << "current=" << current_state << ","
604                           << "delta=" << delta_state;
605             // IT0    = (,)               + (InodeMap,)
606             // IT1    = (InodeMap,)       + (,Event)
607             // IT2..N = (InodeMap,Event1) + (,Event2)
608             current_state.Merge(std::move(delta_state));
609             return current_state;
610           })
611     .filter([](const CombinedState& state) {
612       return state.HasAll();
613     })
614     .map([](CombinedState& state) -> std::optional<ResolvedPageCacheFtraceEvent> {
615       PageCacheFtraceEvent& event = state.GetEvent();
616       const InodeMap& inode_map = state.GetInodeMap();
617 
618       auto it = inode_map.find(event.inode());
619       if (it != inode_map.end()) {
620         std::string filename = it->second;
621         LOG(VERBOSE) << "compiler::ResolvePageCacheEntriesFromProtos combine_latest " << event;
622         return ResolvedPageCacheFtraceEvent{std::move(filename), std::move(event)};
623       } else {
624         LOG(ERROR) << "compiler: FtraceEvent's inode did not have resolved filename: " << event;
625         return std::nullopt;
626       }
627     })
628     .filter(
629       [](std::optional<ResolvedPageCacheFtraceEvent> maybe_event) {
630         return maybe_event.has_value();
631       })
632     .map([](std::optional<ResolvedPageCacheFtraceEvent> maybe_event) {
633       return std::move(maybe_event.value());
634     });
635     // -> observable<ResolvedPageCacheFtraceEvent>
636 
637   return resolved_events;
638 }
639 
640 namespace detail {
multiless_one(const std::string & a,const std::string & b)641 bool multiless_one(const std::string& a, const std::string& b) {
642   return std::lexicographical_compare(a.begin(), a.end(),
643                                       b.begin(), b.end());
644 }
645 
646 template <typename T>
multiless_one(T && a,T && b)647 constexpr bool multiless_one(T&& a, T&& b) {   // a < b
648   using std::less;  // ADL
649   return less<std::decay_t<T>>{}(std::forward<T>(a), std::forward<T>(b));
650 }
651 
multiless()652 constexpr bool multiless() {
653   return false;  // [] < [] is always false.
654 }
655 
656 template <typename T, typename ... Args>
multiless(T && a,T && b,Args &&...args)657 constexpr bool multiless(T&& a, T&& b, Args&&... args) {
658   if (a != b) {
659     return multiless_one(std::forward<T>(a), std::forward<T>(b));
660   } else {
661     return multiless(std::forward<Args>(args)...);
662   }
663 }
664 
665 }  // namespace detail
666 
667 // Return [A0...An] < [B0...Bn] ; vector-like scalar comparison of each field.
668 // Arguments are passed in the order A0,B0,A1,B1,...,An,Bn.
669 template <typename ... Args>
multiless(Args &&...args)670 constexpr bool multiless(Args&&... args) {
671   return detail::multiless(std::forward<Args>(args)...);
672 }
673 
674 struct CompilerPageCacheEvent {
675   std::string filename;
676   uint64_t timestamp_relative;  // use relative timestamp because absolute values aren't comparable
677                                 // across different trace protos.
678                                 // relative timestamps can be said to be 'approximately' comparable.
679                                 // assuming we compare the same application startup's trace times.
680   bool add_to_page_cache;  // AddToPageCache=true, DeleteFromPageCache=false.
681   uint64_t index;          // offset into file: this is a multiple of the page size (usually 4096).
682 
683   // All other data from the ftrace is dropped because we don't currently use it in the
684   // compiler algorithms.
685 
686   CompilerPageCacheEvent() = default;
CompilerPageCacheEventiorap::compiler::CompilerPageCacheEvent687   CompilerPageCacheEvent(const ResolvedPageCacheFtraceEvent& resolved)
688     : CompilerPageCacheEvent(resolved.filename, resolved.event) {
689   }
690 
CompilerPageCacheEventiorap::compiler::CompilerPageCacheEvent691   CompilerPageCacheEvent(ResolvedPageCacheFtraceEvent&& resolved)
692     : CompilerPageCacheEvent(std::move(resolved.filename), std::move(resolved.event)) {
693   }
694 
695   // Compare all fields (except the timestamp field).
LessIgnoringTimestampiorap::compiler::CompilerPageCacheEvent696   static bool LessIgnoringTimestamp(const CompilerPageCacheEvent& a,
697                                     const CompilerPageCacheEvent& b) {
698     return multiless(a.filename, b.filename,
699                      a.add_to_page_cache, b.add_to_page_cache,
700                      a.index, b.index);
701   }
702 
703   // Compare all fields. Timestamps get highest precedence.
operator <iorap::compiler::CompilerPageCacheEvent704   bool operator<(const CompilerPageCacheEvent& rhs) const {
705     return multiless(timestamp_relative, rhs.timestamp_relative,
706                      filename, rhs.filename,
707                      add_to_page_cache, rhs.add_to_page_cache,
708                      index, rhs.index);
709   }
710 
711  private:
CompilerPageCacheEventiorap::compiler::CompilerPageCacheEvent712   CompilerPageCacheEvent(std::string filename, const PageCacheFtraceEvent& event)
713     : filename(std::move(filename)),
714       timestamp_relative(event.timestamp_relative),
715       add_to_page_cache(event.add_to_page_cache),
716       index(event.index) {
717    }
718 };
719 
operator <<(std::ostream & os,const CompilerPageCacheEvent & e)720 std::ostream& operator<<(std::ostream& os, const CompilerPageCacheEvent& e) {
721   os << "{";
722   os << "filename:\"" << e.filename << "\",";
723   os << "timestamp:" << e.timestamp_relative << ",";
724   os << "add_to_page_cache:" << e.add_to_page_cache << ",";
725   os << "index:" << e.index;
726   os << "}";
727   return os;
728 }
729 
730 // Filter an observable chain of 'ResolvedPageCacheFtraceEvent'
731 // into an observable chain of 'ResolvedPageCacheFtraceEvent'.
732 //
733 // Any items emitted by the input chain that match the regular expression
734 // specified by blacklist_filter are not emitted into the output chain.
ApplyBlacklistToPageCacheEvents(rxcpp::observable<ResolvedPageCacheFtraceEvent> resolved_events,std::optional<std::string> blacklist_filter)735 auto/*observable<ResolvedPageCacheFtraceEvent>*/ ApplyBlacklistToPageCacheEvents(
736     rxcpp::observable<ResolvedPageCacheFtraceEvent> resolved_events,
737     std::optional<std::string> blacklist_filter) {
738   bool has_re = blacklist_filter.has_value();
739   // default regex engine is ecmascript.
740   std::regex reg_exp{blacklist_filter ? *blacklist_filter : std::string("")};
741 
742   return resolved_events.filter(
743     [reg_exp, has_re](const ResolvedPageCacheFtraceEvent& event) {
744       if (!has_re) {
745         return true;
746       }
747       // Remove any entries that match the regex in --blacklist-filter/-bf.
748       bool res = std::regex_search(event.filename, reg_exp);
749       if (res) {
750         LOG(VERBOSE) << "Blacklist filter removed '" << event.filename << "' from chain.";
751       }
752       return !res;
753     });
754 }
755 
756 // Compile an observable chain of 'ResolvedPageCacheFtraceEvent' into
757 // an observable chain of distinct, timestamp-ordered, CompilerPageCacheEvent.
758 //
759 // This is a reducing operation: No items are emitted until resolved_events is completed.
CompilePageCacheEvents(rxcpp::observable<ResolvedPageCacheFtraceEvent> resolved_events)760 auto/*observable<CompilerPageCacheEvent>*/ CompilePageCacheEvents(
761     rxcpp::observable<ResolvedPageCacheFtraceEvent> resolved_events) {
762 
763   struct CompilerPageCacheEventIgnoringTimestampLess {
764     bool operator()(const CompilerPageCacheEvent& lhs,
765                     const CompilerPageCacheEvent& rhs) const {
766       return CompilerPageCacheEvent::LessIgnoringTimestamp(lhs, rhs);
767     }
768   };
769 
770   // Greedy O(N) compilation algorithm.
771   //
772   // This produces an inoptimal result (e.g. a small timestamp
773   // that might occur only 1% of the time nevertheless wins out), but the
774   // algorithm itself is quite simple, and doesn't require any heuristic tuning.
775 
776   // First pass: *Merge* into set that ignores the timestamp value for order, but retains
777   //             the smallest timestamp value if the same key is re-inserted.
778   using IgnoreTimestampForOrderingSet =
779       std::set<CompilerPageCacheEvent, CompilerPageCacheEventIgnoringTimestampLess>;
780   // Second pass: *Sort* data by smallest timestamp first.
781   using CompilerPageCacheEventSet =
782       std::set<CompilerPageCacheEvent>;
783 
784   return resolved_events
785     .map(
786       [](ResolvedPageCacheFtraceEvent event) {
787         // Drop all the extra metadata like pid, cpu, etc.
788         // When we merge we could keep a list of the original data, but there is no advantage
789         // to doing so.
790         return CompilerPageCacheEvent{std::move(event)};
791       }
792     )
793    .reduce(
794     IgnoreTimestampForOrderingSet{},
795     [](IgnoreTimestampForOrderingSet set, CompilerPageCacheEvent event) {
796       // Add each event to the set, keying by everything but the timestamp.
797       // If the key is already inserted, re-insert with the smaller timestamp value.
798       auto it = set.find(event);
799 
800       if (it == set.end()) {
801         // Need to insert new element.
802         set.insert(std::move(event));
803       } else if (it->timestamp_relative > event.timestamp_relative) {
804         // Replace existing element: the new element has a smaller timestamp.
805         it = set.erase(it);
806         // Amortized O(1) time if insertion happens in the position before the hint.
807         set.insert(it, std::move(event));
808       } // else: Skip insertion. Element already present with the minimum timestamp.
809 
810       return set;
811     },
812     [](IgnoreTimestampForOrderingSet set) {
813       // Extract all elements from 'set', re-insert into 'ts_set'.
814       // The values are now ordered by timestamp (and then the rest of the fields).
815       CompilerPageCacheEventSet ts_set;
816       ts_set.merge(std::move(set));
817 
818 
819       std::shared_ptr<CompilerPageCacheEventSet> final_set{
820           new CompilerPageCacheEventSet{std::move(ts_set)}};
821       return final_set;
822       // return ts_set;
823     })  // observable<CompilerPageCacheEventSet> (just)
824   .flat_map(
825     [](std::shared_ptr<CompilerPageCacheEventSet> final_set) {
826       // TODO: flat_map seems to make a copy of the parameter _every single iteration_
827       // without the shared_ptr it would just make a copy of the set every time it went
828       // through the iterate function.
829       // Causing absurdly slow compile times x1000 slower than we wanted.
830       // TODO: file a bug upstream and/or fix upstream.
831       CompilerPageCacheEventSet& ts_set = *final_set;
832     // [](CompilerPageCacheEventSet& ts_set) {
833       LOG(DEBUG) << "compiler: Merge-pass completed (" << ts_set.size() << " entries).";
834       //return rxcpp::sources::iterate(std::move(ts_set));
835       return rxcpp::sources::iterate(ts_set).map([](CompilerPageCacheEvent e) { return e; });
836     }
837   );   // observable<CompilerPageCacheEvent>
838 }
839 
840 /** Makes a vector of info that includes filename and timestamp limit. */
MakeCompilationInputs(std::vector<std::string> input_file_names,std::vector<uint64_t> timestamp_limit_ns,std::vector<int32_t> pids)841 std::vector<CompilationInput> MakeCompilationInputs(
842     std::vector<std::string> input_file_names,
843     std::vector<uint64_t> timestamp_limit_ns,
844     std::vector<int32_t> pids){
845   // If the timestamp limit is empty, set the limit to max value
846   // for each trace file.
847   if (timestamp_limit_ns.empty()) {
848     for (size_t i = 0; i < input_file_names.size(); i++) {
849       timestamp_limit_ns.push_back(std::numeric_limits<uint64_t>::max());
850     }
851   }
852 
853   // If the pids is empty, set all of them to -1. Because negative pid means any.
854   if (pids.empty()) {
855     for (size_t i = 0; i < input_file_names.size(); i++) {
856       pids.push_back(-1);
857     }
858   }
859 
860   DCHECK_EQ(input_file_names.size(), timestamp_limit_ns.size());
861   std::vector<CompilationInput> file_infos;
862   for (size_t i = 0; i < input_file_names.size(); i++) {
863     file_infos.push_back({input_file_names[i], timestamp_limit_ns[i], pids[i]});
864   }
865   return file_infos;
866 }
867 
PerformCompilation(std::vector<CompilationInput> perfetto_traces,std::string output_file_name,bool output_proto,std::optional<std::string> blacklist_filter,inode2filename::InodeResolverDependencies dependencies)868 bool PerformCompilation(std::vector<CompilationInput> perfetto_traces,
869                         std::string output_file_name,
870                         bool output_proto,
871                         std::optional<std::string> blacklist_filter,
872                         inode2filename::InodeResolverDependencies dependencies) {
873   auto trace_protos = ReadPerfettoTraceProtos(std::move(perfetto_traces));
874   auto resolved_events = ResolvePageCacheEntriesFromProtos(std::move(trace_protos),
875                                                            std::move(dependencies));
876   auto filtered_events =
877       ApplyBlacklistToPageCacheEvents(std::move(resolved_events), blacklist_filter);
878   auto compiled_events = CompilePageCacheEvents(std::move(filtered_events));
879 
880   std::ofstream ofs;
881   if (!output_file_name.empty()) {
882 
883     if (!output_proto) {
884       ofs.open(output_file_name);
885 
886       if (!ofs) {
887         LOG(ERROR) << "compiler: Failed to open output file for writing: " << output_file_name;
888         return false;
889       }
890     }
891   }
892 
893   auto trace_file_proto = serialize::ArenaPtr<serialize::proto::TraceFile>::Make();
894 
895   // Fast lookup of filename -> FileIndex id.
896   std::unordered_map<std::string, int64_t /*file handle id*/> file_path_map;
897   int64_t file_handle_id = 0;
898 
899   int counter = 0;
900   compiled_events
901     // .as_blocking()
902     .tap([&](CompilerPageCacheEvent& event) {
903       if (!output_proto) {
904         return;
905       }
906 
907       if (!event.add_to_page_cache) {
908         // Skip DeleteFromPageCache events, they are only used for intermediate.
909         return;
910       }
911 
912       DCHECK(trace_file_proto->mutable_index() != nullptr);
913       serialize::proto::TraceFileIndex& index = *trace_file_proto->mutable_index();
914       int64_t file_handle;
915 
916       // Add TraceFileIndexEntry if it doesn't exist.
917 
918       auto it = file_path_map.find(event.filename);
919       if (it == file_path_map.end()) {
920         file_handle = file_handle_id++;
921         file_path_map[event.filename] = file_handle;
922 
923         serialize::proto::TraceFileIndexEntry* entry = index.add_entries();
924         DCHECK(entry != nullptr);
925         entry->set_id(file_handle);
926         entry->set_file_name(event.filename);
927 
928         if (kIsDebugBuild) {
929           int i = static_cast<int>(file_handle);
930           const serialize::proto::TraceFileIndexEntry& entry_ex = index.entries(i);
931           DCHECK_EQ(entry->id(), entry_ex.id());
932           DCHECK_EQ(entry->file_name(), entry_ex.file_name());
933         }
934       } else {
935         file_handle = it->second;
936       }
937       int kPageSize = 4096;  // TODO: don't hardcode the page size.
938 
939       int entry_size = trace_file_proto->list().entries_size();
940       bool merged = false;
941       if (entry_size > 0) {
942         serialize::proto::TraceFileEntry* entry =
943             trace_file_proto->mutable_list()->mutable_entries(entry_size-1);
944         if (entry->index_id() == file_handle &&
945             entry->file_offset() + entry->file_length() ==
946             static_cast<int64_t>(event.index) * kPageSize) {
947           entry->set_file_length(entry->file_length() + kPageSize);
948           merged = true;
949         }
950       }
951 
952       if (!merged) {
953         // Add TraceFileEntry.
954         DCHECK(trace_file_proto->mutable_list() != nullptr);
955         serialize::proto::TraceFileEntry* entry = trace_file_proto->mutable_list()->add_entries();
956         DCHECK(entry != nullptr);
957 
958         entry->set_index_id(file_handle);
959         // Page index -> file offset in bytes.
960         entry->set_file_offset(static_cast<int64_t>(event.index) * kPageSize);
961         entry->set_file_length(kPageSize);
962       }
963     })
964     .subscribe([&](CompilerPageCacheEvent event) {
965       if (!output_proto) {
966         if (output_file_name.empty()) {
967           LOG(INFO) << "CompilerPageCacheEvent" << event << std::endl;
968         } else {
969           ofs << event << "\n";  // TODO: write in proto format instead.
970         }
971       }
972       ++counter;
973     });
974 
975   if (output_proto) {
976     LOG(DEBUG) << "compiler: WriteFully to begin into " << output_file_name;
977     ::google::protobuf::MessageLite& message = *trace_file_proto.get();
978     if (auto res = serialize::ProtobufIO::WriteFully(message, output_file_name); !res) {
979       errno = res.error();
980       PLOG(ERROR) << "compiler: Failed to write protobuf to file: " << output_file_name;
981       return false;
982     } else {
983       LOG(INFO) << "compiler: Wrote protobuf " << output_file_name;
984     }
985   }
986 
987   LOG(DEBUG) << "compiler: Compilation completed (" << counter << " events).";
988 
989   return true;
990 }
991 
992 }  // namespace iorap::compiler
993