1 // Copyright (C) 2017 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "read_ahead.h"
16 
17 #include "common/trace.h"
18 #include "prefetcher/session_manager.h"
19 #include "prefetcher/session.h"
20 #include "prefetcher/task_id.h"
21 #include "serialize/arena_ptr.h"
22 #include "serialize/protobuf_io.h"
23 
24 #include <android-base/chrono_utils.h>
25 #include <android-base/logging.h>
26 #include <android-base/scopeguard.h>
27 #include <android-base/properties.h>
28 #include <android-base/unique_fd.h>
29 #include <cutils/trace.h>
30 #include <deque>
31 #include <fcntl.h>
32 #include <functional>
33 #include <stdint.h>
34 #include <sys/mman.h>
35 #include <sys/time.h>
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <unordered_map>
39 #include <utils/Printer.h>
40 
41 namespace iorap {
42 namespace prefetcher {
43 
44 enum class PrefetchStrategy {
45   kFadvise = 0,
46   kMmapLocked = 1,
47   kMlock = 2,
48 };
49 
operator <<(std::ostream & os,PrefetchStrategy ps)50 std::ostream& operator<<(std::ostream& os, PrefetchStrategy ps) {
51   switch (ps) {
52     case PrefetchStrategy::kFadvise:
53       os << "fadvise";
54       break;
55     case PrefetchStrategy::kMmapLocked:
56       os << "mmap";
57       break;
58     case PrefetchStrategy::kMlock:
59       os << "mlock";
60       break;
61     default:
62       os << "<invalid>";
63   }
64   return os;
65 }
66 
67 static constexpr PrefetchStrategy kPrefetchStrategy = PrefetchStrategy::kFadvise;
68 static uint64_t kMaxPrefetchBytes = ::android::base::GetUintProperty<uint64_t>(
69     "iorapd.max_prefetch_bytes", /*default*/100 * 1024 * 1024); // 100MB by default
70 
GetPrefetchStrategy()71 static PrefetchStrategy GetPrefetchStrategy() {
72   PrefetchStrategy strat = PrefetchStrategy::kFadvise;
73 
74   std::string prefetch_env =
75       ::android::base::GetProperty("iorapd.readahead.strategy", /*default*/"");
76 
77   if (prefetch_env == "") {
78     LOG(VERBOSE)
79         << "ReadAhead strategy defaulted. Did you want to set iorapd.readahead.strategy ?";
80   } else if (prefetch_env == "mmap") {
81     strat = PrefetchStrategy::kMmapLocked;
82     LOG(VERBOSE) << "ReadAhead strategy: kMmapLocked";
83   } else if (prefetch_env == "mlock") {
84     strat = PrefetchStrategy::kMlock;
85     LOG(VERBOSE) << "ReadAhead strategy: kMlock";
86   } else if (prefetch_env == "fadvise") {
87     strat = PrefetchStrategy::kFadvise;
88     LOG(VERBOSE) << "ReadAhead strategy: kFadvise";
89   } else {
90     LOG(WARNING) << "Unknown iorapd.readahead.strategy: " << prefetch_env << ", ignoring";
91   }
92 
93   return strat;
94 }
95 
96 struct TaskData {
97   TaskId task_id;   // also the session ID.
98 
SessionIdiorap::prefetcher::TaskData99   size_t SessionId() const {
100     if (session != nullptr) {
101       DCHECK_EQ(session->SessionId(), task_id.id);
102     }
103 
104     // good enough to be used as the session ID. Task IDs are always monotonically increasing.
105     return task_id.id;
106   }
107 
108   std::shared_ptr<Session> session;
109   int32_t trace_cookie;  // async trace cookie in BeginTask/FinishTask.
110 };
111 
112 // Remember the last 5 files being prefetched.
113 static constexpr size_t kRecentDataCount = 5;
114 
115 struct RecentData {
116   TaskId task_id;
117   size_t file_lengths_sum;
118 };
119 
120 struct RecentDataKeeper {
121   std::deque<RecentData> recents_;
122   std::mutex mutex_;
123 
RecordRecentiorap::prefetcher::RecentDataKeeper124   void RecordRecent(TaskId task_id, size_t file_lengths_sum) {
125     std::lock_guard<std::mutex> guard{mutex_};
126 
127     while (recents_.size() > kRecentDataCount) {
128       recents_.pop_front();
129     }
130     recents_.push_back(RecentData{std::move(task_id), file_lengths_sum});
131   }
132 
Dumpiorap::prefetcher::RecentDataKeeper133   void Dump(/*borrow*/::android::Printer& printer) {
134     bool locked = mutex_.try_lock();
135 
136     printer.printFormatLine("Recent prefetches:");
137     if (!locked) {
138       printer.printLine("""""  (possible deadlock)");
139     }
140 
141     for (const RecentData& data : recents_) {
142       printer.printFormatLine("  %s", data.task_id.path.c_str());
143       printer.printFormatLine("    Task ID: %zu", data.task_id.id);
144       printer.printFormatLine("    Bytes count: %zu", data.file_lengths_sum);
145     }
146 
147     if (recents_.empty()) {
148       printer.printFormatLine("  (None)");
149     }
150 
151     printer.printLine("");
152 
153     if (locked) {
154       mutex_.unlock();
155     }
156   }
157 };
158 
159 struct ReadAhead::Impl {
Impliorap::prefetcher::ReadAhead::Impl160   Impl(bool use_sockets) {
161     // Flip this property to test in-process vs out-of-process for the prefetcher code.
162     bool out_of_process =
163         ::android::base::GetBoolProperty("iorapd.readahead.out_of_process", /*default*/true);
164 
165     SessionKind session_kind =
166         out_of_process ? SessionKind::kOutOfProcessIpc : SessionKind::kInProcessDirect;
167 
168     if (use_sockets) {
169       session_kind = SessionKind::kOutOfProcessSocket;
170     }
171 
172     session_manager_ = SessionManager::CreateManager(session_kind);
173     session_kind_ = session_kind;
174   }
175 
176   std::unique_ptr<SessionManager> session_manager_;
177   SessionKind session_kind_;
178   std::unordered_map<size_t /*task index*/, TaskData> read_ahead_file_map_;
179   static RecentDataKeeper recent_data_keeper_;
180   int32_t trace_cookie_{0};
181 
UseSocketsiorap::prefetcher::ReadAhead::Impl182   bool UseSockets() const {
183     return session_kind_ == SessionKind::kOutOfProcessSocket;
184   }
185 };
186 
187 RecentDataKeeper ReadAhead::Impl::recent_data_keeper_{};
188 
ReadAhead()189 ReadAhead::ReadAhead() : ReadAhead(/*use_sockets*/false) {
190 }
191 
ReadAhead(bool use_sockets)192 ReadAhead::ReadAhead(bool use_sockets) : impl_(new Impl(use_sockets)) {
193 }
194 
~ReadAhead()195 ReadAhead::~ReadAhead() {}
196 
PerformReadAhead(std::shared_ptr<Session> session,size_t path_id,ReadAheadKind kind,size_t length,size_t offset)197 static bool PerformReadAhead(std::shared_ptr<Session> session, size_t path_id, ReadAheadKind kind, size_t length, size_t offset) {
198   return session->ReadAhead(path_id, kind, length, offset);
199 }
200 
FinishTask(const TaskId & id)201 void ReadAhead::FinishTask(const TaskId& id) {
202   auto it = impl_->read_ahead_file_map_.find(id.id);
203   if (it == impl_->read_ahead_file_map_.end()) {
204     LOG(DEBUG) << "Could not find any TaskData for " << id;
205     return;
206   }
207 
208   TaskData& task_data = it->second;
209   atrace_async_end(ATRACE_TAG_ACTIVITY_MANAGER,
210                    "ReadAhead Task Scope (for File Descriptors)",
211                    task_data.trace_cookie);
212 
213   auto deleter = [&]() {
214     impl_->read_ahead_file_map_.erase(it);
215   };
216   auto scope_guard = android::base::make_scope_guard(deleter);
217   (void)scope_guard;
218 
219   LOG(VERBOSE) << "ReadAhead (Finish)";
220 
221   if (!impl_->session_manager_->DestroySession(task_data.SessionId())) {
222     LOG(WARNING) << "ReadAhead: Failed to destroy Session " << task_data.SessionId();
223   }
224 }
225 
BeginTaskForSockets(const TaskId & id,int32_t trace_cookie)226 void ReadAhead::BeginTaskForSockets(const TaskId& id, int32_t trace_cookie) {
227   LOG(VERBOSE) << "BeginTaskForSockets: " << id;
228 
229   // TODO: atrace.
230   android::base::Timer timer{};
231   android::base::Timer open_timer{};
232 
233   int trace_file_fd_raw =
234       TEMP_FAILURE_RETRY(open(id.path.c_str(), /*flags*/O_RDONLY));
235 
236   android::base::unique_fd trace_file_fd{trace_file_fd_raw};
237 
238   if (!trace_file_fd.ok()) {
239     PLOG(ERROR) << "ReadAhead failed to open trace file: " << id.path;
240     return;
241   }
242 
243   TaskData task_data;
244   task_data.task_id = id;
245   task_data.trace_cookie = trace_cookie;
246 
247   std::shared_ptr<Session> session =
248       impl_->session_manager_->CreateSession(task_data.SessionId(),
249                                              /*description*/id.path,
250                                              trace_file_fd.get());
251   task_data.session = session;
252   CHECK(session != nullptr);
253 
254   task_data.trace_cookie = ++trace_cookie;
255 
256   // TODO: maybe getprop and a single line by default?
257   session->Dump(LOG_STREAM(INFO), /*multiline*/true);
258 
259   impl_->read_ahead_file_map_[id.id] = std::move(task_data);
260   // FinishTask is identical, as it just destroys the session.
261 }
262 
BeginTask(const TaskId & id)263 void ReadAhead::BeginTask(const TaskId& id) {
264   {
265     struct timeval now;
266     gettimeofday(&now, nullptr);
267 
268     uint64_t now_usec = (now.tv_sec * 1000000LL + now.tv_usec);
269     LOG(DEBUG) << "BeginTask: beginning usec: " << now_usec;
270   }
271 
272   int32_t trace_cookie = ++impl_->trace_cookie_;
273   atrace_async_begin(ATRACE_TAG_ACTIVITY_MANAGER,
274                      "ReadAhead Task Scope (for File Descriptors)",
275                      trace_cookie);
276 
277   if (impl_->UseSockets()) {
278     BeginTaskForSockets(id, trace_cookie);
279     return;
280   }
281 
282   LOG(VERBOSE) << "BeginTask: " << id;
283 
284   // TODO: atrace.
285   android::base::Timer timer{};
286 
287   // TODO: refactor this code with SessionDirect::ProcessFd ?
288   TaskData task_data;
289   task_data.task_id = id;
290   task_data.trace_cookie = trace_cookie;
291 
292   ScopedFormatTrace atrace_begin_task(ATRACE_TAG_ACTIVITY_MANAGER,
293                                       "ReadAhead::BeginTask %s",
294                                       id.path.c_str());
295 
296   // Include CreateSession above the Protobuf deserialization so that we can include
297   // the 'total_duration' as part of the Session dump (relevant when we use IPC mode only).
298   std::shared_ptr<Session> session =
299       impl_->session_manager_->CreateSession(task_data.SessionId(),
300                                              /*description*/id.path);
301 
302   android::base::Timer open_timer{};
303 
304   // XX: Should we rename all the 'Create' to 'Make', or rename the 'Make' to 'Create' ?
305   // Unfortunately make_unique, make_shared, etc is the standard C++ terminology.
306   serialize::ArenaPtr<serialize::proto::TraceFile> trace_file_ptr =
307       serialize::ProtobufIO::Open(id.path);
308 
309   if (trace_file_ptr == nullptr) {
310     // TODO: distinguish between missing trace (this is OK, most apps wont have one)
311     // and a bad error.
312     LOG(DEBUG) << "ReadAhead could not start, missing trace file? " << id.path;
313     return;
314   }
315 
316   task_data.session = session;
317   CHECK(session != nullptr);
318 
319   ReadAheadKind kind = static_cast<ReadAheadKind>(GetPrefetchStrategy());
320 
321   // TODO: The "Task[Id]" should probably be the one owning the trace file.
322   // When the task is fully complete, the task can be deleted and the
323   // associated arenas can go with them.
324 
325   // TODO: we should probably have the file entries all be relative
326   // to the package path?
327 
328   // Open every file in the trace index.
329   const serialize::proto::TraceFileIndex& index = trace_file_ptr->index();
330 
331   size_t count_entries = 0;
332   {
333     ScopedFormatTrace atrace_register_file_paths(ATRACE_TAG_ACTIVITY_MANAGER,
334                                                  "ReadAhead::RegisterFilePaths %s",
335                                                  id.path.c_str());
336     for (const serialize::proto::TraceFileIndexEntry& index_entry : index.entries()) {
337       LOG(VERBOSE) << "ReadAhead: found file entry: " << index_entry.file_name();
338 
339       if (index_entry.id() < 0) {
340         LOG(WARNING) << "ReadAhead: Skip bad TraceFileIndexEntry, negative ID not allowed: "
341                      << index_entry.id();
342         continue;
343       }
344 
345       size_t path_id = index_entry.id();
346       const auto& path_file_name = index_entry.file_name();
347 
348       if (!session->RegisterFilePath(path_id, path_file_name)) {
349         LOG(WARNING) << "ReadAhead: Failed to register file path: " << path_file_name;
350       } else {
351         ++count_entries;
352       }
353     }
354   }
355   LOG(VERBOSE) << "ReadAhead: Registered " << count_entries << " file paths";
356   std::chrono::milliseconds open_duration_ms = open_timer.duration();
357 
358   LOG(DEBUG) << "ReadAhead: Opened file&headers in " << open_duration_ms.count() << "ms";
359 
360   size_t length_sum = 0;
361   size_t prefetch_bytes = 0;
362   size_t entry_offset = 0;
363   {
364     ScopedFormatTrace atrace_perform_read_ahead(ATRACE_TAG_ACTIVITY_MANAGER,
365                                                 "ReadAhead::PerformReadAhead entries=%zu, path=%s",
366                                                 count_entries,
367                                                 id.path.c_str());
368 
369     // Go through every trace entry and readahead every (file,offset,len) tuple.
370     const serialize::proto::TraceFileList& file_list = trace_file_ptr->list();
371     for (const serialize::proto::TraceFileEntry& file_entry : file_list.entries()) {
372       ++entry_offset;
373 
374       if (file_entry.file_length() < 0 || file_entry.file_offset() < 0) {
375         LOG(WARNING) << "ReadAhead entry negative file length or offset, illegal: "
376                      << "index_id=" << file_entry.index_id() << ", skipping";
377         continue;
378       }
379 
380       // Attempt to perform readahead. This can generate more warnings dynamically.
381       if (!PerformReadAhead(session, file_entry.index_id(), kind, file_entry.file_length(), file_entry.file_offset())) {
382         // TODO: Do we need below at all? The always-on Dump already prints a % of failed entries.
383         // LOG(WARNING) << "Failed readahead, bad file length/offset in entry @ " << (entry_offset - 1);
384       } else {
385         prefetch_bytes += static_cast<size_t>(file_entry.file_length());
386         if (prefetch_bytes >= kMaxPrefetchBytes) {
387           LOG(WARNING) << "The prefetching size is "
388                        << prefetch_bytes
389                        << " and it exceeds the threshold "
390                        << kMaxPrefetchBytes;
391           break;
392         }
393       }
394 
395       length_sum += static_cast<size_t>(file_entry.file_length());
396     }
397   }
398 
399   {
400     ScopedFormatTrace atrace_session_dump(ATRACE_TAG_ACTIVITY_MANAGER,
401                                           "ReadAhead Session Dump entries=%zu",
402                                           entry_offset);
403     // TODO: maybe getprop and a single line by default?
404     session->Dump(LOG_STREAM(INFO), /*multiline*/true);
405   }
406 
407   atrace_int(ATRACE_TAG_ACTIVITY_MANAGER,
408              "ReadAhead Bytes Length",
409              static_cast<int32_t>(length_sum));
410 
411   impl_->read_ahead_file_map_[id.id] = std::move(task_data);
412 
413   ReadAhead::Impl::recent_data_keeper_.RecordRecent(id, length_sum);
414 }
415 
Dump(::android::Printer & printer)416 void ReadAhead::Dump(::android::Printer& printer) {
417   ReadAhead::Impl::recent_data_keeper_.Dump(printer);
418 }
419 
PrefetchSizeInBytes(const std::string & file_path)420 std::optional<size_t> ReadAhead::PrefetchSizeInBytes(const std::string& file_path) {
421   serialize::ArenaPtr<serialize::proto::TraceFile> trace_file_ptr =
422       serialize::ProtobufIO::Open(file_path);
423 
424   if (trace_file_ptr == nullptr) {
425     LOG(WARNING) << "PrefetchSizeInBytes: bad file at " << file_path;
426     return std::nullopt;
427   }
428 
429   size_t length_sum = 0;
430   const serialize::proto::TraceFileList& file_list = trace_file_ptr->list();
431   for (const serialize::proto::TraceFileEntry& file_entry : file_list.entries()) {
432 
433     if (file_entry.file_length() < 0 || file_entry.file_offset() < 0) {
434       LOG(WARNING) << "ReadAhead entry negative file length or offset, illegal: "
435                    << "index_id=" << file_entry.index_id() << ", skipping";
436       continue;
437     }
438 
439     length_sum += static_cast<size_t>(file_entry.file_length());
440   }
441 
442   return length_sum;
443 }
444 
445 }  // namespace prefetcher
446 }  // namespace iorap
447 
448