1 /*
2  * Copyright (C) 2014 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LogReaderThread.h"
18 
19 #include <errno.h>
20 #include <string.h>
21 #include <sys/prctl.h>
22 
23 #include <thread>
24 
25 #include "LogBuffer.h"
26 #include "LogReaderList.h"
27 #include "SerializedFlushToState.h"
28 
LogReaderThread(LogBuffer * log_buffer,LogReaderList * reader_list,std::unique_ptr<LogWriter> writer,bool non_block,unsigned long tail,LogMask log_mask,pid_t pid,log_time start_time,uint64_t start,std::chrono::steady_clock::time_point deadline)29 LogReaderThread::LogReaderThread(LogBuffer* log_buffer, LogReaderList* reader_list,
30                                  std::unique_ptr<LogWriter> writer, bool non_block,
31                                  unsigned long tail, LogMask log_mask, pid_t pid,
32                                  log_time start_time, uint64_t start,
33                                  std::chrono::steady_clock::time_point deadline)
34     : log_buffer_(log_buffer),
35       reader_list_(reader_list),
36       writer_(std::move(writer)),
37       pid_(pid),
38       tail_(tail),
39       count_(0),
40       index_(0),
41       start_time_(start_time),
42       deadline_(deadline),
43       non_block_(non_block) {
44     CleanSkip();
45     flush_to_state_ = log_buffer_->CreateFlushToState(start, log_mask);
46     auto thread = std::thread{&LogReaderThread::ThreadFunction, this};
47     thread.detach();
48 }
49 
ThreadFunction()50 void LogReaderThread::ThreadFunction() {
51     prctl(PR_SET_NAME, "logd.reader.per");
52 
53     auto lock = std::unique_lock{logd_lock};
54     auto lock_assertion = android::base::ScopedLockAssertion{logd_lock};
55 
56     while (!release_) {
57         if (deadline_.time_since_epoch().count() != 0) {
58             if (thread_triggered_condition_.wait_until(lock, deadline_) ==
59                 std::cv_status::timeout) {
60                 deadline_ = {};
61             }
62             if (release_) {
63                 break;
64             }
65         }
66 
67         if (tail_) {
68             auto first_pass_state = log_buffer_->CreateFlushToState(flush_to_state_->start(),
69                                                                     flush_to_state_->log_mask());
70             log_buffer_->FlushTo(writer_.get(), *first_pass_state,
71                                  [this](log_id_t log_id, pid_t pid, uint64_t sequence,
72                                         log_time realtime) REQUIRES(logd_lock) {
73                                      return FilterFirstPass(log_id, pid, sequence, realtime);
74                                  });
75         }
76         bool flush_success = log_buffer_->FlushTo(
77                 writer_.get(), *flush_to_state_,
78                 [this](log_id_t log_id, pid_t pid, uint64_t sequence, log_time realtime) REQUIRES(
79                         logd_lock) { return FilterSecondPass(log_id, pid, sequence, realtime); });
80 
81         // We only ignore entries before the original start time for the first flushTo(), if we
82         // get entries after this first flush before the original start time, then the client
83         // wouldn't have seen them.
84         // Note: this is still racy and may skip out of order events that came in since the last
85         // time the client disconnected and then reconnected with the new start time.  The long term
86         // solution here is that clients must request events since a specific sequence number.
87         start_time_.tv_sec = 0;
88         start_time_.tv_nsec = 0;
89 
90         if (!flush_success) {
91             break;
92         }
93 
94         if (non_block_ || release_) {
95             break;
96         }
97 
98         CleanSkip();
99 
100         if (deadline_.time_since_epoch().count() == 0) {
101             thread_triggered_condition_.wait(lock);
102         }
103     }
104 
105     writer_->Release();
106 
107     auto& log_reader_threads = reader_list_->reader_threads();
108     auto it = std::find_if(log_reader_threads.begin(), log_reader_threads.end(),
109                            [this](const auto& other) { return other.get() == this; });
110 
111     if (it != log_reader_threads.end()) {
112         log_reader_threads.erase(it);
113     }
114 }
115 
116 // A first pass to count the number of elements
FilterFirstPass(log_id_t,pid_t pid,uint64_t,log_time realtime)117 FilterResult LogReaderThread::FilterFirstPass(log_id_t, pid_t pid, uint64_t, log_time realtime) {
118     if ((!pid_ || pid_ == pid) && (start_time_ == log_time::EPOCH || start_time_ <= realtime)) {
119         ++count_;
120     }
121 
122     return FilterResult::kSkip;
123 }
124 
125 // A second pass to send the selected elements
FilterSecondPass(log_id_t log_id,pid_t pid,uint64_t,log_time realtime)126 FilterResult LogReaderThread::FilterSecondPass(log_id_t log_id, pid_t pid, uint64_t,
127                                                log_time realtime) {
128     if (skip_ahead_[log_id]) {
129         skip_ahead_[log_id]--;
130         return FilterResult::kSkip;
131     }
132 
133     // Truncate to close race between first and second pass
134     if (non_block_ && tail_ && index_ >= count_) {
135         return FilterResult::kStop;
136     }
137 
138     if (pid_ && pid_ != pid) {
139         return FilterResult::kSkip;
140     }
141 
142     if (start_time_ != log_time::EPOCH && realtime <= start_time_) {
143         return FilterResult::kSkip;
144     }
145 
146     if (release_) {
147         return FilterResult::kStop;
148     }
149 
150     if (!tail_) {
151         goto ok;
152     }
153 
154     ++index_;
155 
156     if (count_ > tail_ && index_ <= (count_ - tail_)) {
157         return FilterResult::kSkip;
158     }
159 
160     if (!non_block_) {
161         tail_ = 0;
162     }
163 
164 ok:
165     if (!skip_ahead_[log_id]) {
166         return FilterResult::kWrite;
167     }
168     return FilterResult::kSkip;
169 }
170