1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <brillo/streams/fake_stream.h>
6 
7 #include <algorithm>
8 #include <utility>
9 
10 #include <base/bind.h>
11 #include <brillo/message_loops/message_loop.h>
12 #include <brillo/streams/stream_utils.h>
13 
14 namespace brillo {
15 
16 namespace {
17 
18 // Gets a delta between the two times, makes sure that the delta is positive.
CalculateDelay(const base::Time & now,const base::Time & delay_until)19 base::TimeDelta CalculateDelay(const base::Time& now,
20                                const base::Time& delay_until) {
21   const base::TimeDelta zero_delay;
22   if (delay_until.is_null() || now >= delay_until) {
23     return zero_delay;
24   }
25 
26   base::TimeDelta delay = delay_until - now;
27   if (delay < zero_delay)
28     delay = zero_delay;
29   return delay;
30 }
31 
32 // Given the current clock time, and expected delays for read and write
33 // operations calculates the smaller wait delay of the two and sets the
34 // resulting operation to |*mode| and the delay to wait for into |*delay|.
GetMinDelayAndMode(const base::Time & now,bool read,const base::Time & delay_read_until,bool write,const base::Time & delay_write_until,Stream::AccessMode * mode,base::TimeDelta * delay)35 void GetMinDelayAndMode(const base::Time& now,
36                         bool read, const base::Time& delay_read_until,
37                         bool write, const base::Time& delay_write_until,
38                         Stream::AccessMode* mode, base::TimeDelta* delay) {
39   base::TimeDelta read_delay = base::TimeDelta::Max();
40   base::TimeDelta write_delay = base::TimeDelta::Max();
41 
42   if (read)
43     read_delay = CalculateDelay(now, delay_read_until);
44   if (write)
45     write_delay = CalculateDelay(now, delay_write_until);
46 
47   if (read_delay > write_delay) {
48     read = false;
49   } else if (read_delay < write_delay) {
50     write = false;
51   }
52   *mode = stream_utils::MakeAccessMode(read, write);
53   *delay = std::min(read_delay, write_delay);
54 }
55 
56 }  // anonymous namespace
57 
FakeStream(Stream::AccessMode mode,base::Clock * clock)58 FakeStream::FakeStream(Stream::AccessMode mode,
59                        base::Clock* clock)
60     : mode_{mode}, clock_{clock} {}
61 
AddReadPacketData(base::TimeDelta delay,const void * data,size_t size)62 void FakeStream::AddReadPacketData(base::TimeDelta delay,
63                                    const void* data,
64                                    size_t size) {
65   auto* byte_ptr = static_cast<const uint8_t*>(data);
66   AddReadPacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
67 }
68 
AddReadPacketData(base::TimeDelta delay,brillo::Blob data)69 void FakeStream::AddReadPacketData(base::TimeDelta delay, brillo::Blob data) {
70   InputDataPacket packet;
71   packet.data = std::move(data);
72   packet.delay_before = delay;
73   incoming_queue_.push(std::move(packet));
74 }
75 
AddReadPacketString(base::TimeDelta delay,const std::string & data)76 void FakeStream::AddReadPacketString(base::TimeDelta delay,
77                                      const std::string& data) {
78   AddReadPacketData(delay, brillo::Blob{data.begin(), data.end()});
79 }
80 
QueueReadError(base::TimeDelta delay)81 void FakeStream::QueueReadError(base::TimeDelta delay) {
82   QueueReadErrorWithMessage(delay, std::string{});
83 }
84 
QueueReadErrorWithMessage(base::TimeDelta delay,const std::string & message)85 void FakeStream::QueueReadErrorWithMessage(base::TimeDelta delay,
86                                            const std::string& message) {
87   InputDataPacket packet;
88   packet.data.assign(message.begin(), message.end());
89   packet.delay_before = delay;
90   packet.read_error = true;
91   incoming_queue_.push(std::move(packet));
92 }
93 
ClearReadQueue()94 void FakeStream::ClearReadQueue() {
95   std::queue<InputDataPacket>().swap(incoming_queue_);
96   delay_input_until_ = base::Time{};
97   input_buffer_.clear();
98   input_ptr_ = 0;
99   report_read_error_ = 0;
100 }
101 
ExpectWritePacketSize(base::TimeDelta delay,size_t data_size)102 void FakeStream::ExpectWritePacketSize(base::TimeDelta delay,
103                                        size_t data_size) {
104   OutputDataPacket packet;
105   packet.expected_size = data_size;
106   packet.delay_before = delay;
107   outgoing_queue_.push(std::move(packet));
108 }
109 
ExpectWritePacketData(base::TimeDelta delay,const void * data,size_t size)110 void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
111                                        const void* data,
112                                        size_t size) {
113   auto* byte_ptr = static_cast<const uint8_t*>(data);
114   ExpectWritePacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
115 }
116 
ExpectWritePacketData(base::TimeDelta delay,brillo::Blob data)117 void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
118                                        brillo::Blob data) {
119   OutputDataPacket packet;
120   packet.expected_size = data.size();
121   packet.data = std::move(data);
122   packet.delay_before = delay;
123   outgoing_queue_.push(std::move(packet));
124 }
125 
ExpectWritePacketString(base::TimeDelta delay,const std::string & data)126 void FakeStream::ExpectWritePacketString(base::TimeDelta delay,
127                                          const std::string& data) {
128   ExpectWritePacketData(delay, brillo::Blob{data.begin(), data.end()});
129 }
130 
QueueWriteError(base::TimeDelta delay)131 void FakeStream::QueueWriteError(base::TimeDelta delay) {
132   QueueWriteErrorWithMessage(delay, std::string{});
133 }
134 
QueueWriteErrorWithMessage(base::TimeDelta delay,const std::string & message)135 void FakeStream::QueueWriteErrorWithMessage(base::TimeDelta delay,
136                                             const std::string& message) {
137   OutputDataPacket packet;
138   packet.expected_size = 0;
139   packet.data.assign(message.begin(), message.end());
140   packet.delay_before = delay;
141   packet.write_error = true;
142   outgoing_queue_.push(std::move(packet));
143 }
144 
ClearWriteQueue()145 void FakeStream::ClearWriteQueue() {
146   std::queue<OutputDataPacket>().swap(outgoing_queue_);
147   delay_output_until_ = base::Time{};
148   output_buffer_.clear();
149   expected_output_data_.clear();
150   max_output_buffer_size_ = 0;
151   all_output_data_.clear();
152   report_write_error_ = 0;
153 }
154 
GetFlushedOutputData() const155 const brillo::Blob& FakeStream::GetFlushedOutputData() const {
156   return all_output_data_;
157 }
158 
GetFlushedOutputDataAsString() const159 std::string FakeStream::GetFlushedOutputDataAsString() const {
160   return std::string{all_output_data_.begin(), all_output_data_.end()};
161 }
162 
CanRead() const163 bool FakeStream::CanRead() const {
164   return stream_utils::IsReadAccessMode(mode_);
165 }
166 
CanWrite() const167 bool FakeStream::CanWrite() const {
168   return stream_utils::IsWriteAccessMode(mode_);
169 }
170 
SetSizeBlocking(uint64_t,ErrorPtr * error)171 bool FakeStream::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
172   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
173 }
174 
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)175 bool FakeStream::Seek(int64_t /* offset */,
176                       Whence /* whence */,
177                       uint64_t* /* new_position */,
178                       ErrorPtr* error) {
179   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
180 }
181 
IsReadBufferEmpty() const182 bool FakeStream::IsReadBufferEmpty() const {
183   return input_ptr_ >= input_buffer_.size();
184 }
185 
PopReadPacket()186 bool FakeStream::PopReadPacket() {
187   if (incoming_queue_.empty())
188     return false;
189   InputDataPacket& packet = incoming_queue_.front();
190   input_ptr_ = 0;
191   input_buffer_ = std::move(packet.data);
192   delay_input_until_ = clock_->Now() + packet.delay_before;
193   incoming_queue_.pop();
194   report_read_error_ = packet.read_error;
195   return true;
196 }
197 
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)198 bool FakeStream::ReadNonBlocking(void* buffer,
199                                  size_t size_to_read,
200                                  size_t* size_read,
201                                  bool* end_of_stream,
202                                  ErrorPtr* error) {
203   if (!CanRead())
204     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
205 
206   if (!IsOpen())
207     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
208 
209   for (;;) {
210     if (!delay_input_until_.is_null() && clock_->Now() < delay_input_until_) {
211       *size_read = 0;
212       if (end_of_stream)
213         *end_of_stream = false;
214       break;
215     }
216 
217     if (report_read_error_) {
218       report_read_error_ = false;
219       std::string message{input_buffer_.begin(), input_buffer_.end()};
220       if (message.empty())
221         message = "Simulating read error for tests";
222       input_buffer_.clear();
223       Error::AddTo(error, FROM_HERE, "fake_stream", "read_error", message);
224       return false;
225     }
226 
227     if (!IsReadBufferEmpty()) {
228       size_to_read = std::min(size_to_read, input_buffer_.size() - input_ptr_);
229       std::memcpy(buffer, input_buffer_.data() + input_ptr_, size_to_read);
230       input_ptr_ += size_to_read;
231       *size_read = size_to_read;
232       if (end_of_stream)
233         *end_of_stream = false;
234       break;
235     }
236 
237     if (!PopReadPacket()) {
238       *size_read = 0;
239       if (end_of_stream)
240         *end_of_stream = true;
241       break;
242     }
243   }
244   return true;
245 }
246 
IsWriteBufferFull() const247 bool FakeStream::IsWriteBufferFull() const {
248   return output_buffer_.size() >= max_output_buffer_size_;
249 }
250 
PopWritePacket()251 bool FakeStream::PopWritePacket() {
252   if (outgoing_queue_.empty())
253     return false;
254   OutputDataPacket& packet = outgoing_queue_.front();
255   expected_output_data_ = std::move(packet.data);
256   delay_output_until_ = clock_->Now() + packet.delay_before;
257   max_output_buffer_size_ = packet.expected_size;
258   report_write_error_ = packet.write_error;
259   outgoing_queue_.pop();
260   return true;
261 }
262 
WriteNonBlocking(const void * buffer,size_t size_to_write,size_t * size_written,ErrorPtr * error)263 bool FakeStream::WriteNonBlocking(const void* buffer,
264                                   size_t size_to_write,
265                                   size_t* size_written,
266                                   ErrorPtr* error) {
267   if (!CanWrite())
268     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
269 
270   if (!IsOpen())
271     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
272 
273   for (;;) {
274     if (!delay_output_until_.is_null() && clock_->Now() < delay_output_until_) {
275       *size_written = 0;
276       return true;
277     }
278 
279     if (report_write_error_) {
280       report_write_error_ = false;
281       std::string message{expected_output_data_.begin(),
282                           expected_output_data_.end()};
283       if (message.empty())
284         message = "Simulating write error for tests";
285       output_buffer_.clear();
286       max_output_buffer_size_ = 0;
287       expected_output_data_.clear();
288       Error::AddTo(error, FROM_HERE, "fake_stream", "write_error", message);
289       return false;
290     }
291 
292     if (!IsWriteBufferFull()) {
293       bool success = true;
294       size_to_write = std::min(size_to_write,
295                                max_output_buffer_size_ - output_buffer_.size());
296       auto byte_ptr = static_cast<const uint8_t*>(buffer);
297       output_buffer_.insert(output_buffer_.end(),
298                             byte_ptr, byte_ptr + size_to_write);
299       if (output_buffer_.size()  == max_output_buffer_size_) {
300         if (!expected_output_data_.empty() &&
301             expected_output_data_ != output_buffer_) {
302           // We expected different data to be written, report an error.
303           Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
304                        "Unexpected data written");
305           success = false;
306         }
307 
308         all_output_data_.insert(all_output_data_.end(),
309                                 output_buffer_.begin(), output_buffer_.end());
310 
311         output_buffer_.clear();
312         max_output_buffer_size_ = 0;
313         expected_output_data_.clear();
314       }
315       *size_written = size_to_write;
316       return success;
317     }
318 
319     if (!PopWritePacket()) {
320       // No more data expected.
321       Error::AddTo(error, FROM_HERE, "fake_stream", "full",
322                    "No more output data expected");
323       return false;
324     }
325   }
326 }
327 
FlushBlocking(ErrorPtr * error)328 bool FakeStream::FlushBlocking(ErrorPtr* error) {
329   if (!CanWrite())
330     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
331 
332   if (!IsOpen())
333     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
334 
335   bool success = true;
336   if (!output_buffer_.empty()) {
337     if (!expected_output_data_.empty() &&
338         expected_output_data_ != output_buffer_) {
339       // We expected different data to be written, report an error.
340       Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
341                    "Unexpected data written");
342       success = false;
343     }
344     all_output_data_.insert(all_output_data_.end(),
345                             output_buffer_.begin(), output_buffer_.end());
346 
347     output_buffer_.clear();
348     max_output_buffer_size_ = 0;
349     expected_output_data_.clear();
350   }
351   return success;
352 }
353 
CloseBlocking(ErrorPtr *)354 bool FakeStream::CloseBlocking(ErrorPtr* /* error */) {
355   is_open_ = false;
356   return true;
357 }
358 
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)359 bool FakeStream::WaitForData(AccessMode mode,
360                              const base::Callback<void(AccessMode)>& callback,
361                              ErrorPtr* error) {
362   bool read_requested = stream_utils::IsReadAccessMode(mode);
363   bool write_requested = stream_utils::IsWriteAccessMode(mode);
364 
365   if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
366     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
367 
368   if (read_requested && IsReadBufferEmpty())
369     PopReadPacket();
370   if (write_requested && IsWriteBufferFull())
371     PopWritePacket();
372 
373   base::TimeDelta delay;
374   GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
375                      write_requested, delay_output_until_, &mode, &delay);
376   MessageLoop::current()->PostDelayedTask(
377       FROM_HERE, base::Bind(callback, mode), delay);
378   return true;
379 }
380 
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)381 bool FakeStream::WaitForDataBlocking(AccessMode in_mode,
382                                      base::TimeDelta timeout,
383                                      AccessMode* out_mode,
384                                      ErrorPtr* error) {
385   bool read_requested = stream_utils::IsReadAccessMode(in_mode);
386   bool write_requested = stream_utils::IsWriteAccessMode(in_mode);
387 
388   if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
389     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
390 
391   base::TimeDelta delay;
392   GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
393                      write_requested, delay_output_until_, out_mode, &delay);
394 
395   if (timeout < delay)
396     return stream_utils::ErrorOperationTimeout(FROM_HERE, error);
397 
398   LOG(INFO) << "TEST: Would have blocked for " << delay.InMilliseconds()
399             << " ms.";
400 
401   return true;
402 }
403 
404 }  // namespace brillo
405