1 // Copyright (C) 2018 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "common/debug.h"
16 #include "inode2filename/search_directories.h"
17 #include "inode2filename/system_call.h"
18 
19 #include <android-base/file.h>
20 #include <android-base/logging.h>
21 #include <android-base/scopeguard.h>
22 #include <android-base/stringprintf.h>
23 #include <android-base/unique_fd.h>
24 
25 #include "rxcpp/rx.hpp"
26 
27 #include <iostream>
28 #include <stdio.h>
29 #include <fstream>
30 #include <vector>
31 #include <optional>
32 
33 #include <signal.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 
37 #include <sys/types.h>
38 
39 #ifdef __ANDROID__
40 #include <sys/sysmacros.h>
41 #endif
42 
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <poll.h>
46 #include <dirent.h>
47 
48 #include <unordered_map>
49 
50 namespace rx = rxcpp;
51 using android::base::unique_fd;  // NOLINT
52 using android::base::StringPrintf;  // NOLINT
53 
54 namespace iorap::inode2filename {
55 
56 #define DEBUG_INODE_SET 0
57 
58 // A multimap of 'ino_t -> List[Inode]' (where the value Inodes have the same ino_t as the key).
59 //
60 // A flat list of Inodes is turned into the above map, then keys can be removed one at a time
61 // until the InodeSet eventually becomes empty.
62 struct InodeSet {
63 
64   InodeSet() = default;
65 #if DEBUG_INODE_SET
InodeSetiorap::inode2filename::InodeSet66   InodeSet(const InodeSet& other) {
67     LOG(INFO) << "InodeSet-copyctor";
68     set_ = other.set_;
69   }
70 
InodeSetiorap::inode2filename::InodeSet71   InodeSet(InodeSet&& other) {
72     LOG(INFO) << "InodeSet-movector";
73     set_ = std::move(other.set_);
74   }
75 
operator =iorap::inode2filename::InodeSet76   InodeSet& operator=(const InodeSet& other) {
77     LOG(INFO) << "InodeSet-opassign-copy";
78     set_ = other.set_;
79     return *this;
80   }
81 
operator =iorap::inode2filename::InodeSet82   InodeSet& operator=(InodeSet&& other) {
83     LOG(INFO) << "InodeSet-opassign-move";
84     set_ = std::move(other.set_);
85     return *this;
86   }
87 #else
88   InodeSet(InodeSet&& other) = default;
89   InodeSet& operator=(InodeSet&& other) = default;
90   // Copying InodeSet can be very expensive, refuse to even allow compiling such code.
91   InodeSet(const InodeSet& other) = delete;
92   InodeSet& operator=(const InodeSet& other) = delete;
93 #endif
94 
95   struct ValueRange {
beginiorap::inode2filename::InodeSet::ValueRange96     auto/*Iterable<Inode>*/ begin() {
97       return begin_;
98     }
99 
endiorap::inode2filename::InodeSet::ValueRange100     auto/*Iterable<Inode>*/ end() {
101       return end_;
102     }
103 
emptyiorap::inode2filename::InodeSet::ValueRange104     bool empty() const {
105       return begin_ == end_;
106     }
107 
operator booliorap::inode2filename::InodeSet::ValueRange108     explicit operator bool() const {
109       return !empty();
110     }
111 
112     std::unordered_multimap<ino_t, Inode>::iterator begin_, end_;
113 
114     friend std::ostream& operator<<(std::ostream& os, const ValueRange& s);
115   };
116 
117   // Create an observable that emits the remaining inodes in the map.
118   //
119   // Mutation functions must not be called until this observable
120   // has been finished emitting all values (e.g. with on_completed) since that
121   // would cause the underlying iterators to go into an undefined state.
IterateValuesiorap::inode2filename::InodeSet122   auto/*observable<Inode>*/ IterateValues() const {
123     return rxcpp::observable<>::iterate(set_).map(  // XX: should we use identity_immediate here?
124         [](const std::pair<const ino_t, Inode>& pair) {
125           return pair.second;
126         }
127     );
128     // TODO: this would be more efficient as a range-v3 view.
129   }
130 
Emptyiorap::inode2filename::InodeSet131   constexpr bool Empty() const {
132     return set_.empty();
133   }
134 
OfListiorap::inode2filename::InodeSet135   static InodeSet OfList(const std::vector<Inode>& list) {
136     InodeSet new_inode_set;
137     std::unordered_multimap<ino_t, Inode>* map = &new_inode_set.set_;
138 
139     for (const Inode& inode : list) {
140       map->insert({inode.inode, inode});
141     }
142 
143     return new_inode_set;
144   }
145 
146   // Return an optional list of 'Inode' structs whose 'inode' field matches the 'inode' parameter.
147   // Returns an empty range if there was nothing found.
FindInodeListiorap::inode2filename::InodeSet148   ValueRange FindInodeList(ino_t inode) {
149     auto range = set_.equal_range(inode);
150     return ValueRange{range.first, range.second};
151   }
152 
153   // Match all fields of an Inode against a 'struct stat' stat_buf.
154   //
155   // The returned Inode (if any) is removed from the InodeSet; it will not be returned by
156   // FindInodeList in future calls.
FindAndRemoveInodeInListiorap::inode2filename::InodeSet157   std::optional<Inode> FindAndRemoveInodeInList(ValueRange inode_list,
158                                                 const struct stat& stat_buf) {
159     LOG(VERBOSE) << "FindAndRemoveInodeInList " << inode_list << ", "
160                  << "stat_buf{st_dev=" << stat_buf.st_dev << ",st_ino=" << stat_buf.st_ino << "}";
161 
162     auto /*iterator*/ found = std::find_if(inode_list.begin(),
163                                            inode_list.end(),
164                                            [&](const std::pair<ino_t, Inode>& pair) {
165       const Inode& inode = pair.second;
166       if (inode.inode != stat_buf.st_ino) {
167         return false;
168       }
169 
170       dev_t inode_dev =
171           makedev(static_cast<int>(inode.device_major), static_cast<int>(inode.device_minor));
172 
173       // Inodes could be the same across different devices.
174       // Also match the device id.
175       if (inode_dev != stat_buf.st_dev) {
176         LOG(VERBOSE) << "InodeSet:FindAndRemoveInodeInList matched ino: " << inode.inode
177                      << " but not device"
178                      << ", expected dev: " << stat_buf.st_dev
179                      << ", actual dev: " << inode_dev;
180         return false;
181       }
182       return true;
183     });
184 
185     if (found != inode_list.end()) {
186       Inode inode = found->second;
187       LOG(VERBOSE) << "InodeSet:FindAndRemoveInodeInList *success* inode+device " << inode;
188       DCHECK(found->second.inode == stat_buf.st_ino);
189       // Erase the inode from the list. This is important.
190       set_.erase(found);
191       return inode;
192     }
193 
194     return std::nullopt;
195   }
196 
197   // Match all fields of an Inode against another Inode.
198   //
199   // The returned Inode (if any) is removed from the InodeSet; it will not be returned by
200   // FindInodeList in future calls.
FindAndRemoveInodeInListiorap::inode2filename::InodeSet201   std::optional<Inode> FindAndRemoveInodeInList(ValueRange inode_list,
202                                                 const Inode& inode) {
203     LOG(VERBOSE) << "FindAndRemoveInodeInList " << inode_list << ", "
204                  << inode << "}";
205 
206     auto /*iterator*/ found = std::find_if(inode_list.begin(),
207                                            inode_list.end(),
208                                            [&](const std::pair<ino_t, Inode>& pair) {
209       return inode == pair.second;
210     });
211 
212     if (found != inode_list.end()) {
213       Inode inode = found->second;
214       LOG(VERBOSE) << "InodeSet:FindAndRemoveInodeInList *success* inode+device " << inode;
215       DCHECK_EQ(found->second, inode);
216       // Erase the inode from the list. This is important.
217       set_.erase(found);
218       return inode;
219     }
220 
221     return std::nullopt;
222   }
223 
224   // TODO: equality and string operators for testing/logging.
225  private:
226   // Explanation: readdir returns a 'file' -> 'ino_t inode' mapping.
227   //
228   // However inodes can be reused on different partitions (but they have a different device number).
229   // To handle this edge case, and to avoid calling stat whenever the inode definitely doesn't match
230   // store the inodes into a single-key,multi-value container.
231   //
232   // This enables fast scanning of readdir results by matching just the 'inode' portion,
233   // then calling stat only when the inode portion definitely matches to confirm the device.
234 
235   // There are no single-key multi-value containers in standard C++, so pretend
236   // we have one by writing this simple facade around an unordered set.
237   //
238   // We expect that the vector size is usually size=1 (or 2 or 3) since the # of devices
239   // is fixed by however many partitions there are on the system, AND the same inode #
240   // would have to be reused across a different file.
241   std::unordered_multimap<ino_t, Inode> set_;  // TODO: Rename to map_.
242 
243   friend std::ostream& operator<<(std::ostream& os, const InodeSet& s);
244 };
245 
operator <<(std::ostream & os,const InodeSet & s)246 std::ostream& operator<<(std::ostream& os, const InodeSet& s) {
247   os << "InodeSet{";
248   for (const auto& kv : s.set_) {
249     // e.g. "123=>(1:2:123)" ... its expected for the 'ino_t' portion to be repeated.
250     os << "" << kv.first << "=>(" << kv.second << "),";
251   }
252   os << "}";
253   return os;
254 }
255 
operator <<(std::ostream & os,const InodeSet::ValueRange & v)256 std::ostream& operator<<(std::ostream& os, const InodeSet::ValueRange& v) {
257   // Don't want to make a const and non const version of ValueRange.
258   InodeSet::ValueRange& s = const_cast<InodeSet::ValueRange&>(v);
259 
260   os << "InodeSet::ValueRange{";
261   for (const auto& kv : s) {
262     // e.g. "123=>(1:2:123)" ... its expected for the 'ino_t' portion to be repeated.
263     os << "" << kv.first << "=>(" << kv.second << "),";
264   }
265   os << "}";
266   return os;
267 }
268 
269 void search_for_inodes_in(std::vector<Inode>& inode_list, const std::string& dirpath);
270 
271 enum DirectoryEntryErrorCode {
272   kInvalid,    // not a real error code. to detect bad initialization.
273   kOpenDir,    // opendir failed.
274   kReadDir,    // readdir failed.
275   kDtUnknown,  // d_type was DT_UNKNOWN error.
276 };
277 
278 struct DirectoryEntryError {
279   DirectoryEntryErrorCode code;
280   int err_no;
281   std::string filename;
282 };
283 
operator <<(std::ostream & os,const DirectoryEntryError & e)284 std::ostream& operator<<(std::ostream& os, const DirectoryEntryError& e) {
285   os << "DirectoryEntryError{"
286      << static_cast<int>(e.code) << "," << e.err_no << "," << e.filename << "}";
287   return os;
288   // TODO: pretty-print code and err-no
289 }
290 
291 static common::DebugCounter gDebugDirectoryEntryCounter{};
292 static constexpr bool kDebugDirectoryEntry = false;
293 
294 #define DIRECTORY_ENTRY_MOVE_DCHECK() \
295     DCHECK_EQ(other.moved_from_, false) << __PRETTY_FUNCTION__ << "CNT:" << other.debug_counter_;
296 #define DIRECTORY_ENTRY_TRACE_CTOR() \
297     if (kDebugDirectoryEntry) LOG(VERBOSE) << __PRETTY_FUNCTION__ << "@CNT:" << debug_counter_
298 
299 struct DirectoryEntry {
300   using ResultT = iorap::expected<DirectoryEntry, DirectoryEntryError>;
301   using ObservableT = rx::observable<ResultT>;
302 
303   static constexpr ino_t kInvalidIno = std::numeric_limits<ino_t>::max();
304   static constexpr auto kInvalidFileName = "";
305 
306   // Path to file, the prefix is one of the root directories.
307   std::string filename{kInvalidFileName};
308   // Inode number of the file. Not unique across different devices.
309   ino_t d_ino{kInvalidIno};
310   // File type (DT_LNK, DT_REG, DT_DIR, or DT_UNKNOWN)
311   unsigned char d_type{DT_UNKNOWN};  // Note: not seen outside of sentinel roots.
312   // TODO: Consider invariant checks for valid combinations of above fields?
313 
314   // Debug-only flags.
315   bool moved_from_{false};
316   size_t debug_counter_{0};
317 
318  private:
319   // TODO: remove default constructor?
320   //
321   // SEEMS TO BE USED by std::vector etc. FIX DAT.
DirectoryEntryiorap::inode2filename::DirectoryEntry322   DirectoryEntry() noexcept {
323     debug_counter_ = gDebugDirectoryEntryCounter++;
324     DIRECTORY_ENTRY_TRACE_CTOR();
325   }
326  public:
DirectoryEntryiorap::inode2filename::DirectoryEntry327   DirectoryEntry(std::string filename, ino_t d_ino, unsigned char d_type) noexcept
328     : filename{std::move(filename)},
329       d_ino{d_ino},
330       d_type{d_type} {
331     debug_counter_ = gDebugDirectoryEntryCounter++;
332     DIRECTORY_ENTRY_TRACE_CTOR();
333   }
334 
DirectoryEntryiorap::inode2filename::DirectoryEntry335   DirectoryEntry(const DirectoryEntry& other) noexcept {
336     // Do not use member-initialization syntax so that this DCHECK can execute first.
337     DIRECTORY_ENTRY_MOVE_DCHECK();
338 
339     filename = other.filename;
340     d_ino = other.d_ino;
341     d_type = other.d_type;
342     children_paths_ = other.children_paths_;
343     children_initialized_ = other.children_initialized_;
344     debug_counter_ = other.debug_counter_;
345     DIRECTORY_ENTRY_TRACE_CTOR();
346   }
347 
operator =iorap::inode2filename::DirectoryEntry348   DirectoryEntry& operator=(const DirectoryEntry& other) noexcept {
349     if (this == &other) {
350       return *this;
351     }
352 
353     DIRECTORY_ENTRY_MOVE_DCHECK();
354 
355     filename = other.filename;
356     d_ino = other.d_ino;
357     d_type = other.d_type;
358     children_paths_ = other.children_paths_;
359     children_initialized_ = other.children_initialized_;
360     debug_counter_ = other.debug_counter_;
361     DIRECTORY_ENTRY_TRACE_CTOR();
362 
363     return *this;
364   }
365 
operator =iorap::inode2filename::DirectoryEntry366   DirectoryEntry& operator=(DirectoryEntry&& other) noexcept {
367     if (this == &other) {
368       return *this;
369     }
370 
371     DIRECTORY_ENTRY_MOVE_DCHECK();
372 
373     filename = std::move(other.filename);
374     d_ino = other.d_ino;
375     d_type = other.d_type;
376     children_paths_ = std::move(other.children_paths_);
377     children_initialized_ = other.children_initialized_;
378     debug_counter_ = other.debug_counter_;
379     DIRECTORY_ENTRY_TRACE_CTOR();
380 
381     return *this;
382   }
383 
DirectoryEntryiorap::inode2filename::DirectoryEntry384   DirectoryEntry(DirectoryEntry&& other) noexcept {
385     DIRECTORY_ENTRY_MOVE_DCHECK();
386     other.moved_from_ = true;
387 
388     filename = std::move(other.filename);
389     d_ino = other.d_ino;
390     d_type = other.d_type;
391     children_paths_ = std::move(other.children_paths_);
392     children_initialized_ = other.children_initialized_;
393     debug_counter_ = other.debug_counter_;
394     DIRECTORY_ENTRY_TRACE_CTOR();
395   }
396 
397   // Create a sentinel (root of roots) whose children entries are those specified by
398   // children_paths.
CreateSentineliorap::inode2filename::DirectoryEntry399   static DirectoryEntry CreateSentinel(std::vector<std::string> children_paths) {
400     DirectoryEntry e;
401     e.d_type = DT_DIR;
402     ++gDebugDirectoryEntryCounter;
403 
404     for (std::string& child_path : children_paths) {
405       // TODO: Should we call Stat on the child path here to reconstitute the ino_t for a root dir?
406       // Otherwise it can look a little strange (i.e. the root dir itself will never match
407       // the searched inode).
408       //
409       // Probably not too big of a problem in practice.
410       DirectoryEntry child_entry{std::move(child_path), kInvalidIno, DT_DIR};
411       ResultT child_entry_as_result{std::move(child_entry)};
412       e.children_paths_.push_back(std::move(child_entry_as_result));
413     }
414 
415     e.children_initialized_ = true;
416 
417     return e;
418   }
419 
420   // Return an observable which emits the direct children only.
421   // The children entries are now read from disk (with readdir) if they weren't read previously.
GetChildrenEntriesiorap::inode2filename::DirectoryEntry422   std::vector<ResultT> GetChildrenEntries(borrowed<SystemCall*> system_call) const& {
423     BuildChildrenPaths(system_call);
424     return children_paths_;
425   }
426 
427   // Return an observable which emits the direct children only.
428   // The children entries are now read from disk (with readdir) if they weren't read previously.
429   // Movable overload.
GetChildrenEntriesiorap::inode2filename::DirectoryEntry430   std::vector<ResultT> GetChildrenEntries(borrowed<SystemCall*> system_call) && {
431     BuildChildrenPaths(system_call);
432     return std::move(children_paths_);
433   }
434 
435   // Returns a (lazy) observable that emits every single node, in pre-order,
436   // rooted at this tree.
437   //
438   // New entries are only read from disk (with e.g. readdir) when more values are pulled
439   // from the observable. Only the direct children of any entry are read at any time.
440   //
441   // The emission can be stopped prematurely by unsubscribing from the observable.
442   // This means the maximum amount of 'redundant' IO reads is bounded by the children count
443   // of all entries emitted thus far minus entries actually emitted.
444   ObservableT GetSubTreePreOrderEntries(borrowed<SystemCall*> system_call) const;
445 
446  private:
447   // Out-of-line definition to avoid circular type dependency.
448   void BuildChildrenPaths(borrowed<SystemCall*> system_call) const;
449 
450   // We need to lazily initialize children_paths_ only when we try to read them.
451   //
452   // Assuming the underlying file system doesn't change (which isn't strictly true),
453   // the directory children are referentially transparent.
454   //
455   // In practice we do not need to distinguish between the file contents changing out
456   // from under us in this code, so we don't need the more strict requirements.
457   mutable std::vector<ResultT> children_paths_;
458   mutable bool children_initialized_{false};
459 
460   friend std::ostream& operator<<(std::ostream& os, const DirectoryEntry& d);
461 };
462 
operator <<(std::ostream & os,const DirectoryEntry & d)463 std::ostream& operator<<(std::ostream& os, const DirectoryEntry& d) {
464   os << "DirectoryEntry{" << d.filename << ",ino:" << d.d_ino << ",type:" << d.d_type << "}";
465   return os;
466 }
467 
468 using DirectoryEntryResult = DirectoryEntry::ResultT;
469 
470 // Read all directory entries and return it as a vector. This must be an eager operation,
471 // as readdir is not re-entrant.
472 //
473 // This could be considered as a limitation from the 'observable' perspective since
474 // one can end up reading unnecessary extra directory entries that are then never consumed.
475 //
476 // The following entries are skipped:
477 //  - '.' self
478 //  - ".." parent
479 //
480 // All DT types except the following are removed:
481 //  * DT_LNK - symbolic link (empty children)
482 //  * DT_REG - regular file  (empty children)
483 //  * DT_DIR - directory     (has children)
484 static std::vector<DirectoryEntryResult>
ReadDirectoryEntriesFromDirectoryPath(std::string dirpath,borrowed<SystemCall * > system_call)485     ReadDirectoryEntriesFromDirectoryPath(std::string dirpath, borrowed<SystemCall*> system_call) {
486   DIR *dirp;
487   struct dirent *dp;
488 
489   LOG(VERBOSE) << "ReadDirectoryEntriesFromDirectoryPath(" << dirpath << ")";
490 
491   if ((dirp = system_call->opendir(dirpath.c_str())) == nullptr) {
492     PLOG(ERROR) << "Couldn't open directory: " << dirpath;
493     return {DirectoryEntryError{kOpenDir, errno, dirpath}};
494   }
495 
496   // Read all the results up front because readdir is not re-entrant.
497   std::vector<DirectoryEntryResult> results;
498 
499   // Get full path + the directory entry path.
500   auto child_path = [&] { return dirpath + "/" + dp->d_name; };
501 
502   do {
503     errno = 0;
504     if ((dp = system_call->readdir(dirp)) != nullptr) {
505       if (dp->d_type == DT_DIR) {
506         if (strcmp(".", dp->d_name) == 0 || strcmp("..", dp->d_name) == 0) {
507           LOG(VERBOSE) << "Skip self/parent: " << dp->d_name;
508           continue;
509         }
510 
511         LOG(VERBOSE) << "Find entry " << child_path()
512                      << ", ino: " << dp->d_ino << ", type: " << dp->d_type;
513         results.push_back(DirectoryEntry{child_path(),
514                                          static_cast<ino_t>(dp->d_ino),
515                                          dp->d_type});
516       } else if (dp->d_type == DT_UNKNOWN) {
517         // This seems bad if it happens. We should probably do something about this.
518         LOG(WARNING) << "Found unknown DT entry: " << child_path();
519 
520         results.push_back(DirectoryEntryError{kDtUnknown, /*errno*/0, child_path()});
521       } else if (dp->d_type == DT_LNK || dp->d_type == DT_REG) {
522         // Regular non-directory file entry.
523         results.push_back(DirectoryEntry{child_path(),
524                                          static_cast<ino_t>(dp->d_ino),
525                                          dp->d_type});
526       } else {
527         // Block device, character device, socket, etc...
528         LOG(VERBOSE) << "Skip DT entry of type: " << dp->d_type << " " << child_path();
529       }
530     } else if (errno != 0) {
531       PLOG(ERROR) << "Error reading directory entry in " << dirpath;
532 
533       results.push_back(DirectoryEntryError{kReadDir, errno, dirpath});
534     }
535   } while (dp != nullptr);
536 
537   if (system_call->closedir(dirp) < 0) {
538     PLOG(ERROR) << "Failed to close directory " << dirpath;
539   }
540 
541   return results;
542 }
543 
BuildChildrenPaths(borrowed<SystemCall * > system_call) const544 void DirectoryEntry::BuildChildrenPaths(borrowed<SystemCall*> system_call) const {
545   if (children_initialized_) {
546     return;
547   }
548 
549   if (d_type == DT_DIR) {
550     children_paths_ = ReadDirectoryEntriesFromDirectoryPath(filename, system_call);
551     // TODO: consider using dependency injection here to substitute this function during testing?
552   }
553 }
554 
555 struct InodeSearchParameters {
556   std::vector<Inode> inode_list;
557   std::vector<std::string> root_dirs;
558 };
559 
560 // [IN]
561 // observable: expected<Value, Error>, ...
562 // [OUT]
563 // observable: Value, ...
564 //
565 // Any encountered 'Error' items are dropped after logging.
566 template <typename T>
MapExpectedOrLog(T && observable,::android::base::LogSeverity log_level)567 auto MapExpectedOrLog(T&& observable,
568                       ::android::base::LogSeverity log_level) {
569   return observable.filter([log_level](const auto& result) {
570     if (result) {
571       return true;
572     } else {
573       LOG(log_level) << result.error();
574       return false;
575     }
576   }).map([](auto&& result) {
577     return IORAP_FORWARD_LAMBDA(result).value();
578   });
579 }
580 
581 template <typename T>
MapExpectedOrLogError(T && observable)582 auto MapExpectedOrLogError(T&& observable) {
583   return MapExpectedOrLog(std::forward<T>(observable), ::android::base::ERROR);
584 }
585 
586 template <typename T>
MapOptionalOrDrop(T && observable)587 auto MapOptionalOrDrop(T&& observable) {
588   return observable.filter([](const auto& result) {
589     return result.has_value();
590   }).map([](auto&& result) {
591     return IORAP_FORWARD_LAMBDA(result).value();
592   });
593   // TODO: static_assert this isn't used with an unexpected.
594 }
595 
596 template <typename T, typename F>
VisitValueOrLogError(T && expected,F && visit_func,const char * error_prefix="")597 auto VisitValueOrLogError(T&& expected, F&& visit_func, const char* error_prefix = "") {
598   if (!expected) {
599     LOG(ERROR) << error_prefix << " " << expected.error();
600   } else {
601     visit_func(std::forward<T>(expected).value());
602   }
603   // TODO: Could be good to make this more monadic by returning an optional.
604 }
605 
606 template <typename TSimple, typename T, typename F>
TreeTraversalPreOrderObservableImpl(rx::subscriber<TSimple> dest,T && node,F && fn)607 void TreeTraversalPreOrderObservableImpl(rx::subscriber<TSimple> dest, T&& node, F&& fn) {
608   LOG(VERBOSE) << "TreeTraversalPreOrderObservableImpl (begin) " << __PRETTY_FUNCTION__;
609 
610   if (!dest.is_subscribed()) {
611     LOG(VERBOSE) << "TreeTraversalPreOrderObservableImpl (unsubscribed)";
612     return;
613   } else {
614     LOG(VERBOSE) << "TreeTraversalPreOrderObservableImpl (on_next node)";
615 
616     // Copy the node here. This is less bad than it seems since we haven't yet
617     // calculated its children (except in the root), so its just doing a shallow memcpy (sizeof(T)).
618     //
619     // This assumes the children are calculated lazily, otherwise we'd need to have a separate
620     // NodeBody class which only holds the non-children elements.
621 
622     TSimple copy = std::forward<T>(node);
623     dest.on_next(std::move(copy));
624 
625     if (!node.has_value()) {
626       return;
627     }
628 
629     // Whenever we call 'on_next' also check if we end up unsubscribing.
630     // This avoids the expensive call into the children.
631     if (!dest.is_subscribed()) {
632       LOG(VERBOSE) << "TreeTraversalPreOrderObservableImpl (post-self unsubscribe)";
633       return;
634     }
635 
636     // Eagerly get the childrem, moving them instead of copying them.
637     auto&& children = fn(std::forward<T>(node));
638     for (auto&& child : children) {
639       TreeTraversalPreOrderObservableImpl(dest, IORAP_FORWARD_LAMBDA(child), fn);
640       // TODO: double check this is doing the std::move properly for rvalues.
641 
642       if (!dest.is_subscribed()) {
643         LOG(VERBOSE) << "TreeTraversalPreOrderObservableImpl (unsubscribed in children)";
644         break;
645       }
646     };
647   }
648 }
649 
650 // Creates an observable over all the nodes in the tree rooted at node.
651 // fn is a function that returns the children of that node.
652 //
653 // The items are emitted left-to-right pre-order, and stop early if the
654 // observable is unsubscribed from.
655 //
656 // Implementation requirement:
657 //    typeof(node) -> expected<V, E> or optional<V> or similar.
658 //    fn(node) -> iterable<typeof(node)>
659 //
660 // preorder(self):
661 //   visit(self)
662 //   for child in fn(self):
663 //     preorder(child)
664 template <typename T, typename F>
TreeTraversalPreOrderObservable(T && node,F && fn)665 auto/*observable<T>*/ TreeTraversalPreOrderObservable(T&& node, F&& fn) {
666   LOG(VERBOSE) << "TreeTraversalPreOrderObservable: " << __PRETTY_FUNCTION__;
667 
668   using T_simple = std::decay_t<T>;
669   return rx::observable<>::create<T_simple>(
670     // Copy node to avoid lifetime issues.
671     [node=node,fn=std::forward<F>(fn)](rx::subscriber<T_simple> dest) {
672       LOG(VERBOSE) << "TreeTraversalPreOrderObservable (lambda)";
673       TreeTraversalPreOrderObservableImpl<T_simple>(dest,
674                                                     std::move(node),
675                                                     std::move(fn));
676       dest.on_completed();
677     }
678   );
679 }
680 
681 DirectoryEntry::ObservableT
GetSubTreePreOrderEntries(borrowed<SystemCall * > system_call) const682     DirectoryEntry::GetSubTreePreOrderEntries(borrowed<SystemCall*> system_call) const {
683   return TreeTraversalPreOrderObservable(
684       DirectoryEntryResult{*this},
685       [system_call=system_call](auto/*DirectoryEntryResult*/&& result)
686           -> std::vector<DirectoryEntryResult> {
687         if (!result) {
688           LOG(VERBOSE) << "GetSubTreePreOrderEntries (no value return)";
689           // Cannot have children when it was an error.
690           return {};
691         }
692         return
693             IORAP_FORWARD_LAMBDA(result)
694             .value()
695             .GetChildrenEntries(system_call);
696       });
697 }
698 
699 struct StatError {
700   int err_no;
701   std::string path_name;
702 };
703 
operator <<(std::ostream & os,const StatError & e)704 std::ostream& operator<<(std::ostream& os, const StatError& e) {
705   os << "StatError{" << e.err_no << "," << e.path_name
706      << ": " << strerror(e.err_no) << "}";
707   return os;
708 }
709 
710 template <typename U = void>  // suppress unused warning.
Stat(const std::string & path_name,borrowed<SystemCall * > system_call)711 static iorap::expected<struct stat, StatError> Stat(const std::string& path_name,
712                                                     borrowed<SystemCall*> system_call) {
713   struct stat statbuf{};
714 
715   // Call stat(2) in live code. Overridden in test code.
716   if (system_call->stat(path_name.c_str(), /*out*/&statbuf) == 0) {
717     return statbuf;
718   } else {
719     return iorap::unexpected(StatError{errno, path_name});
720   }
721 }
722 
723 using StatResult = iorap::expected<struct stat, StatError>;
724 
725 // An inode's corresponding filename on the system.
726 struct SearchMatch {
727   Inode inode;
728   // Relative path joined with a root directory.
729   //
730   // Use absolute path root dirs to get back absolute path filenames.
731   // If relative, this is relative to the current working directory.
732   std::string filename;
733 };
734 
operator <<(std::ostream & os,const SearchMatch & s)735 std::ostream& operator<<(std::ostream& os, const SearchMatch& s) {
736   os << "SearchMatch{" << s.inode << ", " << s.filename << "}";
737   return os;
738 }
739 
740 struct SearchState {
741   // Emit 'match' Inodes corresponding to the ones here.
742   InodeSet inode_set;
743 
744   // An inode matching one of the ones in inode_set was discovered in the most-recently
745   // emitted SearchState.
746   //
747   // The InodeSet removes any matching 'Inode'.
748   std::optional<SearchMatch> match;
749 
750   SearchState() = default;
751   SearchState(SearchState&& other) = default;
752 
753   // Do not copy this because copying InodeSet is excruciatingly slow.
754   SearchState(const SearchState& other) = delete;
755 
756   // TODO: make sure this doesn't copy [inodes], as that would be unnecessarily expensive.
757 };
758 
operator <<(std::ostream & os,const SearchState & s)759 std::ostream& operator<<(std::ostream& os, const SearchState& s) {
760   os << "SearchState{match:";
761   // Print the 'match' first. The InodeSet could be very large so it could be truncated in logs.
762   if (s.match) {
763     os << s.match.value();
764   } else {
765     os << "(none)";
766   }
767   os << ", inode_set:" << s.inode_set << "}";
768   return os;
769 }
770 
771 // TODO: write operator<< etc.
772 
773 // Return a lazy observable that will search for all filenames whose inodes
774 // match the inodes in inode_search_list.
775 //
776 // Every unmatched inode will be emitted as an unexpected at the end of the stream.
SearchDirectoriesForMatchingInodes(std::vector<std::string> root_dirs,std::vector<Inode> inode_search_list,borrowed<SystemCall * > system_call)777 auto/*[observable<InodeResult>, connectable]*/ SearchDirectoriesForMatchingInodes(
778     std::vector<std::string> root_dirs,
779     std::vector<Inode> inode_search_list,
780     borrowed<SystemCall*> system_call) {
781 
782   // Create a (lazy) observable that will emit each DirectoryEntry that is a recursive subchild
783   // of root_dirs. Emission will be stopped when its unsubscribed from.
784   //
785   // This is done by calling readdir(3) lazily.
786   auto/*obs<DirectoryEntry>*/ find_all_subdir_entries = ([&]() {
787     DirectoryEntry sentinel = DirectoryEntry::CreateSentinel(std::move(root_dirs));
788     auto/*obs<DirectoryEntryResult*/ results = sentinel.GetSubTreePreOrderEntries(system_call);
789 
790     // Drop any errors by logging them to logcat. "Unwrap" the expected into the underlying data.
791     auto/*obs<DirectoryEntry*>*/ expected_drop_errors = MapExpectedOrLogError(std::move(results));
792     return expected_drop_errors;
793   })();
794 
795   // DirectoryEntry is missing the dev_t portion, so we may need to call scan(2) again
796   // to confirm the dev_t. We skip calling scan(2) when the ino_t does not match.
797   // InodeSet lets us optimally avoid calling scan(2).
798   std::shared_ptr<SearchState> initial = std::make_shared<SearchState>();
799   initial->inode_set = InodeSet::OfList(inode_search_list);
800 
801   auto/*[observable<SearchState>,Connectable]*/ search_state_results = find_all_subdir_entries.scan(
802       std::move(initial),
803       [system_call=system_call](std::shared_ptr<SearchState> search_state,
804                                 const DirectoryEntry& dir_entry) {
805         LOG(VERBOSE) << "SearchDirectoriesForMatchingInodes#Scan "
806                      << dir_entry << ", state: " << *search_state;
807 
808         search_state->match = std::nullopt;
809 
810         InodeSet* inodes = &search_state->inode_set;
811 
812         // Find all the possible inodes across different devices.
813         InodeSet::ValueRange inode_list = inodes->FindInodeList(dir_entry.d_ino);
814 
815         // This directory doesn't correspond to any inodes we are searching for.
816         if (!inode_list) {
817           return search_state;
818         }
819 
820         StatResult maybe_stat = Stat(dir_entry.filename, system_call);
821         VisitValueOrLogError(maybe_stat, [&](const struct stat& stat_buf) {
822           // Try to match the specific inode. Usually this will not result in a match (nullopt).
823           std::optional<Inode> inode = inodes->FindAndRemoveInodeInList(inode_list, stat_buf);
824 
825           if (inode) {
826             search_state->match = SearchMatch{inode.value(), dir_entry.filename};
827           }
828         });
829 
830         return search_state;
831       }
832   // Avoid exhausting a potentially 'infinite' stream of files by terminating as soon
833   // as we find every single inode we care about.
834   ).take_while([](std::shared_ptr<SearchState> state) {
835       // Also emit the last item that caused the search set to go empty.
836       bool cond = !state->inode_set.Empty() || state->match;
837 
838       if (WOULD_LOG(VERBOSE)) {
839         static int kCounter = 0;
840         LOG(VERBOSE) << "SearchDirectoriesForMatchingInodes#take_while (" << kCounter++ <<
841                      ",is_empty:"
842                      << state->inode_set.Empty() << ", match:" << state->match.has_value();
843       }
844       // Minor O(1) implementation inefficiency:
845       // (Too minor to fix but it can be strange if looking at the logs or readdir traces).
846       //
847       // Note, because we return 'true' after the search set went empty,
848       // the overall stream graph still pulls from search_state_results exactly once more:
849       //
850       // This means that for cond to go to false, we would've read one extra item and then discarded
851       // it. If that item was the first child of a directory, that means we essentially did
852       // one redundant pass of doing a readdir.
853       // In other words if the search set goes to empty while the current item is a directory,
854       //
855       // it will definitely readdir on it at least once as we try to get the first child in
856       // OnTreeTraversal.
857       //
858       // This could be fixed with a 'take_until(Predicate)' operator which doesn't discard
859       // the last item when the condition becomes false. However rxcpp seems to lack this operator,
860       // whereas RxJava has it.
861 
862       if (!cond) {
863         LOG(VERBOSE) << "SearchDirectoriesForMatchingInodes#take_while "
864                      << "should now terminate for " << *state;
865       }
866 
867       return cond;
868   }).publish();
869   // The publish here is mandatory. The stream is consumed twice (once by matched and once by
870   // unmatched streams). Without the publish, once all items from 'matched' were consumed it would
871   // start another instance of 'search_state_results' (i.e. it appears as if the search
872   // is restarted).
873   //
874   // By using 'publish', the search_state_results is effectively shared by both downstream nodes.
875   // Note that this also requires the subscriber to additionally call #connect on the above stream,
876   // otherwise no work will happen.
877 
878   // Lifetime notes:
879   //
880   // The the 'SearchState' is emitted into both below streams simultaneously.
881   //    The 'unmatched_inode_values' only touches the inode_set.
882   //    The 'matched_inode_values' only touches the match.
883   // Either stream can 'std::move' from those fields because they don't move each other's fields.
884   auto/*observable<InodeResult>*/ matched_inode_values = search_state_results
885       .filter([](std::shared_ptr<SearchState> search_state) {
886                   return search_state->match.has_value(); })
887       .map([](std::shared_ptr<SearchState> search_state) {
888                   return std::move(search_state->match.value()); })
889                      // observable<SearchMatch>
890       .map([](SearchMatch search_match) {
891           return InodeResult::makeSuccess(search_match.inode, std::move(search_match.filename));
892       });            // observable<InodeResult>
893 
894   auto/*observable<?>*/ unmatched_inode_values = search_state_results
895       // The 'last' SearchState is the one that contains all the remaining inodes.
896       .take_last(1)  // observable<SearchState>
897       .flat_map([](std::shared_ptr<SearchState> search_state) {
898           LOG(VERBOSE) << "SearchDirectoriesForMatchingInodes#unmatched -- flat_map";
899           // Aside: Could've used a move here if the inodes weren't so lightweight already.
900           return search_state->inode_set.IterateValues(); })
901                      // observable<Inode>
902       .map([](const Inode& inode) {
903           LOG(VERBOSE) << "SearchDirectoriesForMatchingInodes#unmatched -- map";
904           return InodeResult::makeFailure(inode, InodeResult::kCouldNotFindFilename);
905       });
906                      // observable<InodeResult>
907 
908   // The matched and unmatched InodeResults are emitted together.
909   //   Use merge, not concat, because we need both observables to be subscribed to simultaneously.
910 
911   auto/*observable<InodeResult*/ all_inode_results =
912       matched_inode_values.merge(unmatched_inode_values);
913 
914   // Now that all mid-stream observables have been connected, turn the Connectable observable
915   // into a regular observable.
916 
917   // The caller has to call 'connect' on the search_state_results after subscribing
918   // and before any work can actually start.
919   return std::make_pair(all_inode_results, search_state_results);
920 }
921 
FindFilenamesFromInodes(std::vector<std::string> root_directories,std::vector<Inode> inode_list,SearchMode mode) const922 rxcpp::observable<InodeResult> SearchDirectories::FindFilenamesFromInodes(
923     std::vector<std::string> root_directories,
924     std::vector<Inode> inode_list,
925     SearchMode mode) const {
926   DCHECK(mode == SearchMode::kInProcessDirect) << " other modes not implemented yet";
927 
928   auto/*observable[2]*/ [inode_results, connectable] = SearchDirectoriesForMatchingInodes(
929       std::move(root_directories),
930       std::move(inode_list),
931       system_call_);
932 
933   return inode_results.ref_count(connectable);
934 }
935 
936 // I think we could avoid this with auto_connect, which rxcpp doesn't seem to have.
937 //
938 // I can't figure out any other way to avoid this, or at least to allow connecting
939 // on the primary observable (instead of a secondary side-observable).
940 //
941 // If using the obvious publish+ref_count then the unmerged stream gets no items emitted into it.
942 // If tried to ref_count later, everything turns into no-op.
943 // If trying to call connect too early, the subscribe is missed.
944 template <typename T>
945 struct RxAnyConnectableFromObservable : public SearchDirectories::RxAnyConnectable {
connectiorap::inode2filename::RxAnyConnectableFromObservable946   virtual void connect() override {
947     observable.connect();
948   }
949 
~RxAnyConnectableFromObservableiorap::inode2filename::RxAnyConnectableFromObservable950   virtual ~RxAnyConnectableFromObservable() {}
951 
RxAnyConnectableFromObservableiorap::inode2filename::RxAnyConnectableFromObservable952   RxAnyConnectableFromObservable(rxcpp::connectable_observable<T> observable)
953     : observable(observable) {
954   }
955 
956   rxcpp::connectable_observable<T> observable;
957 };
958 
959 // Type deduction helper.
960 template <typename T>
961 std::unique_ptr<SearchDirectories::RxAnyConnectable>
MakeRxAnyConnectableFromObservable(rxcpp::connectable_observable<T> observable)962     MakeRxAnyConnectableFromObservable(rxcpp::connectable_observable<T> observable) {
963   SearchDirectories::RxAnyConnectable* ptr = new RxAnyConnectableFromObservable<T>{observable};
964   return std::unique_ptr<SearchDirectories::RxAnyConnectable>{ptr};
965 }
966 
967 std::pair<rxcpp::observable<InodeResult>, std::unique_ptr<SearchDirectories::RxAnyConnectable>>
FindFilenamesFromInodesPair(std::vector<std::string> root_directories,std::vector<Inode> inode_list,SearchMode mode) const968     SearchDirectories::FindFilenamesFromInodesPair(
969         std::vector<std::string> root_directories,
970         std::vector<Inode> inode_list,
971         SearchMode mode) const {
972   DCHECK(mode == SearchMode::kInProcessDirect) << " other modes not implemented yet";
973 
974   auto/*observable[2]*/ [inode_results, connectable] = SearchDirectoriesForMatchingInodes(
975       std::move(root_directories),
976       std::move(inode_list),
977       system_call_);
978 
979   std::unique_ptr<SearchDirectories::RxAnyConnectable> connectable_ptr =
980     MakeRxAnyConnectableFromObservable(connectable.as_dynamic());
981 
982   return {inode_results, std::move(connectable_ptr)};
983 }
984 
985 rxcpp::observable<InodeResult>
FindFilenamesFromInodes(std::vector<std::string> root_directories,rxcpp::observable<Inode> inodes,SearchMode mode) const986     SearchDirectories::FindFilenamesFromInodes(std::vector<std::string> root_directories,
987                                                rxcpp::observable<Inode> inodes,
988                                                SearchMode mode) const {
989 
990   // It's inefficient to search for inodes until the full search list is available,
991   // so first reduce to a vector so we can access all the inodes simultaneously.
992   return inodes.reduce(std::vector<Inode>{},
993                        [](std::vector<Inode> vec, Inode inode) {
994                          vec.push_back(inode);
995                          return vec;
996                        },
997                        [](std::vector<Inode> v){
998                          return v;  // TODO: use an identity function
999                        })
1000     .flat_map([root_directories=std::move(root_directories), mode, self=*this]
1001           (std::vector<Inode> vec) {
1002       // All borrowed values (e.g. SystemCall) must outlive the observable.
1003       return self.FindFilenamesFromInodes(root_directories, vec, mode);
1004     }
1005   );
1006 }
1007 
EmitAllInodesFromDirectories(std::vector<std::string> root_dirs,borrowed<SystemCall * > system_call)1008 auto/*[observable<InodeResult>]*/ EmitAllInodesFromDirectories(
1009     std::vector<std::string> root_dirs,
1010     borrowed<SystemCall*> system_call) {
1011 
1012   // Create a (lazy) observable that will emit each DirectoryEntry that is a recursive subchild
1013   // of root_dirs. Emission will be stopped when its unsubscribed from.
1014   //
1015   // This is done by calling readdir(3) lazily.
1016   auto/*obs<DirectoryEntry>*/ find_all_subdir_entries = ([&]() {
1017     DirectoryEntry sentinel = DirectoryEntry::CreateSentinel(std::move(root_dirs));
1018     auto/*obs<DirectoryEntryResult*/ results = sentinel.GetSubTreePreOrderEntries(system_call);
1019 
1020     // Drop any errors by logging them to logcat. "Unwrap" the expected into the underlying data.
1021     auto/*obs<DirectoryEntry*>*/ expected_drop_errors = MapExpectedOrLogError(std::move(results));
1022     return expected_drop_errors;
1023   })();
1024 
1025   // Fill in -1 for the dev_t since readdir only returns the ino_t.
1026   // The caller of this function is expected to call stat(2) later on to fill in
1027   // the full data.
1028   return find_all_subdir_entries.map([](DirectoryEntry e) {
1029     return InodeResult::makeSuccess(Inode::FromDeviceAndInode(-1, e.d_ino), std::move(e.filename));
1030   });
1031 }
1032 
1033 rxcpp::observable<InodeResult>
ListAllFilenames(std::vector<std::string> root_directories) const1034     SearchDirectories::ListAllFilenames(std::vector<std::string> root_directories) const {
1035   // TODO: refactor implementation into DiskScanDataSource.
1036   return EmitAllInodesFromDirectories(std::move(root_directories),
1037                                       /*borrowed*/system_call_);
1038 }
1039 
1040 struct FilterState {
1041   // Emit 'match' Inodes corresponding to the ones here.
1042   InodeSet inode_set;
1043 
1044   // An inode matching one of the ones in inode_set was discovered in the most-recently
1045   // emitted SearchState.
1046   //
1047   // The InodeSet removes any matching 'Inode'.
1048   std::optional<InodeResult> match;
1049 
1050   FilterState() = default;
1051   FilterState(FilterState&& other) = default;
1052 
1053   // Copying the InodeSet is expensive, so forbid any copies.
1054   FilterState(const FilterState& other) = delete;
1055 };
1056 
operator <<(std::ostream & os,const FilterState & s)1057 std::ostream& operator<<(std::ostream& os, const FilterState& s) {
1058   os << "FilterState{match:";
1059   // Print the 'match' first. The InodeSet could be very large so it could be truncated in logs.
1060   if (s.match) {
1061     os << s.match.value();
1062   } else {
1063     os << "(none)";
1064   }
1065   os << ", inode_set:" << s.inode_set << "}";
1066   return os;
1067 }
1068 
FilterFilenamesForSpecificInodes(rxcpp::observable<InodeResult> all_inodes,std::vector<Inode> inode_list,bool missing_device_number,bool needs_verification) const1069 rxcpp::observable<InodeResult> SearchDirectories::FilterFilenamesForSpecificInodes(
1070     rxcpp::observable<InodeResult> all_inodes,
1071     std::vector<Inode> inode_list,
1072     bool missing_device_number,   // missing dev_t portion?
1073     bool needs_verification) const {
1074   // TODO: refactor into InodeResolver
1075 
1076   borrowed<SystemCall*> system_call = system_call_;
1077 
1078   // InodeResult may be missing the dev_t portion, so we may need to call scan(2) again
1079   // to confirm the dev_t. We skip calling scan(2) when the ino_t does not match.
1080   // InodeSet lets us optimally avoid calling scan(2).
1081   std::shared_ptr<FilterState> initial = std::make_shared<FilterState>();
1082   initial->inode_set = InodeSet::OfList(inode_list);
1083 
1084   auto/*[observable<FilterState>,Connectable]*/ filter_state_results = all_inodes.scan(
1085       std::move(initial),
1086       [system_call, missing_device_number]
1087           (std::shared_ptr<FilterState> filter_state, InodeResult inode_result) {
1088         LOG(VERBOSE) << "FilterFilenamesForSpecificInodes#Scan "
1089                      << inode_result << ", state: " << *filter_state;
1090 
1091         filter_state->match = std::nullopt;
1092 
1093         InodeSet* inodes = &filter_state->inode_set;
1094 
1095         // Find all the possible (dev_t, ino_t) potential needles given an ino_t in the haystack.
1096         InodeSet::ValueRange inode_list = inodes->FindInodeList(inode_result.inode.inode);
1097 
1098         // This inode result doesn't correspond to any inodes we are searching for.
1099         if (!inode_list) {
1100           // Drop the result and keep going.
1101           return filter_state;
1102         }
1103 
1104         if (missing_device_number) {
1105           // Need to fill in dev_t by calling stat(2).
1106           VisitValueOrLogError(std::move(inode_result.data), [&](std::string filename) {
1107             StatResult maybe_stat = Stat(filename, system_call);
1108             VisitValueOrLogError(maybe_stat, [&](const struct stat& stat_buf) {
1109               // Try to match the specific inode. Usually this will not result in a match (nullopt).
1110               std::optional<Inode> inode = inodes->FindAndRemoveInodeInList(inode_list, stat_buf);
1111 
1112               if (inode) {
1113                 filter_state->match = InodeResult::makeSuccess(inode.value(), std::move(filename));
1114               }
1115             });
1116 
1117             // Note: stat errors are logged here to make the error closer to the occurrence.
1118             // In theory, we could just return it as an InodeResult but then the error would
1119             // just get logged elsewhere.
1120           });
1121         } else {
1122             // Trust the dev_t in InodeResult is valid. Later passes can verify it.
1123 
1124             // Try to match the specific inode. Usually this will not result in a match (nullopt).
1125             std::optional<Inode> inode =
1126                 inodes->FindAndRemoveInodeInList(inode_list, inode_result.inode);
1127 
1128             if (inode) {
1129               filter_state->match = inode_result;
1130             }
1131 
1132             // Note that the InodeResult doesn't necessarily need to have a valid filename here.
1133             // If the earlier pass returned an error-ed result, this will forward the error code.
1134         }
1135 
1136         return filter_state;
1137       }
1138   // Avoid exhausting a potentially 'infinite' stream of files by terminating as soon
1139   // as we find every single inode we care about.
1140   ).take_while([](std::shared_ptr<FilterState> state) {
1141       // Also emit the last item that caused the search set to go empty.
1142       bool cond = !state->inode_set.Empty() || state->match;
1143 
1144       if (WOULD_LOG(VERBOSE)) {
1145         static int kCounter = 0;
1146         LOG(VERBOSE) << "FilterFilenamesForSpecificInodes#take_while (" << kCounter++ <<
1147                      ",is_empty:"
1148                      << state->inode_set.Empty() << ", match:" << state->match.has_value();
1149       }
1150       // Minor O(1) implementation inefficiency:
1151       // (Too minor to fix but it can be strange if looking at the logs or readdir traces).
1152       //
1153       // Note, because we return 'true' after the search set went empty,
1154       // the overall stream graph still pulls from filter_state_results exactly once more:
1155       //
1156       // This means that for cond to go to false, we would've read one extra item and then discarded
1157       // it. If that item was the first child of a directory, that means we essentially did
1158       // one redundant pass of doing a readdir.
1159       // In other words if the search set goes to empty while the current item is a directory,
1160       //
1161       // it will definitely readdir on it at least once as we try to get the first child in
1162       // OnTreeTraversal.
1163       //
1164       // This could be fixed with a 'take_until(Predicate)' operator which doesn't discard
1165       // the last item when the condition becomes false. However rxcpp seems to lack this operator,
1166       // whereas RxJava has it.
1167 
1168       if (!cond) {
1169         LOG(VERBOSE) << "FilterFilenamesForSpecificInodes#take_while "
1170                      << "should now terminate for " << *state;
1171       }
1172 
1173       return cond;
1174   }).publish();
1175   // The publish here is mandatory. The stream is consumed twice (once by matched and once by
1176   // unmatched streams). Without the publish, once all items from 'matched' were consumed it would
1177   // start another instance of 'filter_state_results' (i.e. it appears as if the search
1178   // is restarted).
1179   //
1180   // By using 'publish', the filter_state_results is effectively shared by both downstream nodes.
1181   // Note that this also requires the subscriber to additionally call #connect on the above stream,
1182   // otherwise no work will happen.
1183 
1184   // Lifetime notes:
1185   //
1186   // The the 'FilterState' is emitted into both below streams simultaneously.
1187   //    The 'unmatched_inode_values' only touches the inode_set.
1188   //    The 'matched_inode_values' only touches the match.
1189   // Either stream can 'std::move' from those fields because they don't move each other's fields.
1190   auto/*observable<InodeResult>*/ matched_inode_values = filter_state_results
1191       .filter([](std::shared_ptr<FilterState> filter_state) {
1192                   return filter_state->match.has_value(); })
1193       .map([](std::shared_ptr<FilterState> filter_state) {
1194                   return std::move(filter_state->match.value()); });
1195                      // observable<InodeResult>
1196 
1197   auto/*observable<?>*/ unmatched_inode_values = filter_state_results
1198       // The 'last' FilterState is the one that contains all the remaining inodes.
1199       .take_last(1)  // observable<FilterState>
1200       .flat_map([](std::shared_ptr<FilterState> filter_state) {
1201           LOG(VERBOSE) << "FilterFilenamesForSpecificInodes#unmatched -- flat_map";
1202           // Aside: Could've used a move here if the inodes weren't so lightweight already.
1203           return filter_state->inode_set.IterateValues(); })
1204                      // observable<Inode>
1205       .map([](const Inode& inode) {
1206           LOG(VERBOSE) << "FilterFilenamesForSpecificInodes#unmatched -- map";
1207           return InodeResult::makeFailure(inode, InodeResult::kCouldNotFindFilename);
1208       });
1209                      // observable<InodeResult>
1210 
1211   // The matched and unmatched InodeResults are emitted together.
1212   //   Use merge, not concat, because we need both observables to be subscribed to simultaneously.
1213 
1214   auto/*observable<InodeResult*/ all_inode_results =
1215       matched_inode_values.merge(unmatched_inode_values);
1216 
1217   // Verify the inode results by calling stat(2).
1218   // Unverified results are turned into an error.
1219 
1220   auto/*observable<InodeResult>*/ verified_inode_results =
1221     all_inode_results.map([needs_verification, system_call](InodeResult result) {
1222       if (!needs_verification || !result) {
1223         // Skip verification if requested, or if the result didn't have a filename.
1224         return result;
1225       }
1226 
1227       const std::string& filename = result.data.value();
1228       StatResult maybe_stat = Stat(filename, system_call);
1229 
1230       if (maybe_stat)
1231       {
1232         if (result.inode == Inode::FromDeviceAndInode(maybe_stat->st_dev, maybe_stat->st_ino)) {
1233           return result;
1234         } else {
1235           LOG(WARNING)
1236               << "FilterFilenamesForSpecificInodes#verified fail out-of-date inode: " << result;
1237           return InodeResult::makeFailure(result.inode, InodeResult::kVerificationFailed);
1238         }
1239       } else {
1240         // Forward stat errors directly, as it could be a missing security rule,
1241         // but turn -ENOENT into casual verification errors.
1242         const StatError& err = maybe_stat.error();
1243         int error_code = err.err_no;
1244         if (err.err_no == ENOENT) {
1245           error_code = InodeResult::kVerificationFailed;
1246 
1247           // TODO: Don't LOG(WARNING) here because this could be very common if we
1248           // access the data much much later after the initial results were read in.
1249           LOG(WARNING)
1250               << "FilterFilenamesForSpecificInodes#verified fail out-of-date filename: " << result;
1251         } else {
1252           LOG(ERROR)
1253               << "FilterFilenamesForSpecificInodes#verified stat(2) failure: " << err;
1254         }
1255 
1256         return InodeResult::makeFailure(result.inode, error_code);
1257       }
1258     });
1259 
1260   // Now that all mid-stream observables have been connected, turn the Connectable observable
1261   // into a regular observable.
1262   return verified_inode_results.ref_count(filter_state_results);
1263 }
1264 
EmitAllFilenames(rxcpp::observable<InodeResult> all_inodes,bool missing_device_number,bool needs_verification) const1265 rxcpp::observable<InodeResult> SearchDirectories::EmitAllFilenames(
1266     rxcpp::observable<InodeResult> all_inodes,
1267     bool missing_device_number,   // missing dev_t portion?
1268     bool needs_verification) const {
1269   // TODO: refactor into InodeResolver
1270 
1271   borrowed<SystemCall*> system_call = system_call_;
1272 
1273   // InodeResult may be missing the dev_t portion, so we may need to call scan(2) again
1274   // to confirm the dev_t.
1275 
1276   using EmitAllState = std::optional<InodeResult>;
1277 
1278   auto/*[observable<FilterState>,Connectable]*/ all_inode_results = all_inodes.map(
1279       [system_call, missing_device_number](InodeResult inode_result) {
1280         LOG(VERBOSE) << "EmitAllFilenames#map "
1281                      << inode_result;
1282 
1283         // Could fail if the device number is missing _and_ stat(2) fails.
1284         EmitAllState match = std::nullopt;
1285 
1286         if (missing_device_number) {
1287           // Need to fill in dev_t by calling stat(2).
1288           VisitValueOrLogError(std::move(inode_result.data), [&](std::string filename) {
1289             StatResult maybe_stat = Stat(filename, system_call);
1290             VisitValueOrLogError(maybe_stat, [&](const struct stat& stat_buf) {
1291               Inode inode = Inode::FromDeviceAndInode(stat_buf.st_dev, stat_buf.st_ino);
1292               match = InodeResult::makeSuccess(inode, std::move(filename));
1293             });
1294 
1295             // Note: stat errors are logged here to make the error closer to the occurrence.
1296             // In theory, we could just return it as an InodeResult but then the error would
1297             // just get logged elsewhere.
1298           });
1299         } else {
1300             // Trust the dev_t in InodeResult is valid. Later passes can verify it.
1301             match = std::move(inode_result);
1302 
1303             // Note that the InodeResult doesn't necessarily need to have a valid filename here.
1304             // If the earlier pass returned an error-ed result, this will forward the error code.
1305         }
1306 
1307         return match;  // implicit move.
1308       }
1309   );
1310 
1311   auto/*observable<InodeResult>*/ matched_inode_values = all_inode_results
1312       .filter([](const EmitAllState& filter_state) { return filter_state.has_value(); })
1313       .map([](EmitAllState& filter_state) { return std::move(filter_state.value()); });
1314                      // observable<InodeResult>
1315 
1316   // Verify the inode results by calling stat(2).
1317   // Unverified results are turned into an error.
1318 
1319   auto/*observable<InodeResult>*/ verified_inode_results =
1320     matched_inode_values.map([needs_verification, system_call](InodeResult result) {
1321       if (!needs_verification || !result) {
1322         // Skip verification if requested, or if the result didn't have a filename.
1323         return result;
1324       }
1325 
1326       const std::string& filename = result.data.value();
1327       StatResult maybe_stat = Stat(filename, system_call);
1328 
1329       if (maybe_stat)
1330       {
1331         if (result.inode == Inode::FromDeviceAndInode(maybe_stat->st_dev, maybe_stat->st_ino)) {
1332           return result;
1333         } else {
1334           LOG(WARNING)
1335               << "EmitAllFilenames#verified fail out-of-date inode: " << result;
1336           return InodeResult::makeFailure(result.inode, InodeResult::kVerificationFailed);
1337         }
1338       } else {
1339         // Forward stat errors directly, as it could be a missing security rule,
1340         // but turn -ENOENT into casual verification errors.
1341         const StatError& err = maybe_stat.error();
1342         int error_code = err.err_no;
1343         if (err.err_no == ENOENT) {
1344           error_code = InodeResult::kVerificationFailed;
1345 
1346           // TODO: Don't LOG(WARNING) here because this could be very common if we
1347           // access the data much much later after the initial results were read in.
1348           LOG(WARNING)
1349               << "EmitAllFilenames#verified fail out-of-date filename: " << result;
1350         } else {
1351           LOG(ERROR)
1352               << "EmitAllFilenames#verified stat(2) failure: " << err;
1353         }
1354 
1355         return InodeResult::makeFailure(result.inode, error_code);
1356       }
1357     });
1358 
1359   // TODO: refactor this function some more with the Find(inode_set) equivalent.
1360 
1361   // Now that all mid-stream observables have been connected, turn the Connectable observable
1362   // into a regular observable.
1363   return verified_inode_results;
1364 }
1365 
1366 }  // namespace iorap::inode2filename
1367