1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <brillo/streams/fake_stream.h>
6
7 #include <algorithm>
8 #include <utility>
9
10 #include <base/bind.h>
11 #include <brillo/message_loops/message_loop.h>
12 #include <brillo/streams/stream_utils.h>
13
14 namespace brillo {
15
16 namespace {
17
18 // Gets a delta between the two times, makes sure that the delta is positive.
CalculateDelay(const base::Time & now,const base::Time & delay_until)19 base::TimeDelta CalculateDelay(const base::Time& now,
20 const base::Time& delay_until) {
21 const base::TimeDelta zero_delay;
22 if (delay_until.is_null() || now >= delay_until) {
23 return zero_delay;
24 }
25
26 base::TimeDelta delay = delay_until - now;
27 if (delay < zero_delay)
28 delay = zero_delay;
29 return delay;
30 }
31
32 // Given the current clock time, and expected delays for read and write
33 // operations calculates the smaller wait delay of the two and sets the
34 // resulting operation to |*mode| and the delay to wait for into |*delay|.
GetMinDelayAndMode(const base::Time & now,bool read,const base::Time & delay_read_until,bool write,const base::Time & delay_write_until,Stream::AccessMode * mode,base::TimeDelta * delay)35 void GetMinDelayAndMode(const base::Time& now,
36 bool read, const base::Time& delay_read_until,
37 bool write, const base::Time& delay_write_until,
38 Stream::AccessMode* mode, base::TimeDelta* delay) {
39 base::TimeDelta read_delay = base::TimeDelta::Max();
40 base::TimeDelta write_delay = base::TimeDelta::Max();
41
42 if (read)
43 read_delay = CalculateDelay(now, delay_read_until);
44 if (write)
45 write_delay = CalculateDelay(now, delay_write_until);
46
47 if (read_delay > write_delay) {
48 read = false;
49 } else if (read_delay < write_delay) {
50 write = false;
51 }
52 *mode = stream_utils::MakeAccessMode(read, write);
53 *delay = std::min(read_delay, write_delay);
54 }
55
56 } // anonymous namespace
57
FakeStream(Stream::AccessMode mode,base::Clock * clock)58 FakeStream::FakeStream(Stream::AccessMode mode,
59 base::Clock* clock)
60 : mode_{mode}, clock_{clock} {}
61
AddReadPacketData(base::TimeDelta delay,const void * data,size_t size)62 void FakeStream::AddReadPacketData(base::TimeDelta delay,
63 const void* data,
64 size_t size) {
65 auto* byte_ptr = static_cast<const uint8_t*>(data);
66 AddReadPacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
67 }
68
AddReadPacketData(base::TimeDelta delay,brillo::Blob data)69 void FakeStream::AddReadPacketData(base::TimeDelta delay, brillo::Blob data) {
70 InputDataPacket packet;
71 packet.data = std::move(data);
72 packet.delay_before = delay;
73 incoming_queue_.push(std::move(packet));
74 }
75
AddReadPacketString(base::TimeDelta delay,const std::string & data)76 void FakeStream::AddReadPacketString(base::TimeDelta delay,
77 const std::string& data) {
78 AddReadPacketData(delay, brillo::Blob{data.begin(), data.end()});
79 }
80
QueueReadError(base::TimeDelta delay)81 void FakeStream::QueueReadError(base::TimeDelta delay) {
82 QueueReadErrorWithMessage(delay, std::string{});
83 }
84
QueueReadErrorWithMessage(base::TimeDelta delay,const std::string & message)85 void FakeStream::QueueReadErrorWithMessage(base::TimeDelta delay,
86 const std::string& message) {
87 InputDataPacket packet;
88 packet.data.assign(message.begin(), message.end());
89 packet.delay_before = delay;
90 packet.read_error = true;
91 incoming_queue_.push(std::move(packet));
92 }
93
ClearReadQueue()94 void FakeStream::ClearReadQueue() {
95 std::queue<InputDataPacket>().swap(incoming_queue_);
96 delay_input_until_ = base::Time{};
97 input_buffer_.clear();
98 input_ptr_ = 0;
99 report_read_error_ = 0;
100 }
101
ExpectWritePacketSize(base::TimeDelta delay,size_t data_size)102 void FakeStream::ExpectWritePacketSize(base::TimeDelta delay,
103 size_t data_size) {
104 OutputDataPacket packet;
105 packet.expected_size = data_size;
106 packet.delay_before = delay;
107 outgoing_queue_.push(std::move(packet));
108 }
109
ExpectWritePacketData(base::TimeDelta delay,const void * data,size_t size)110 void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
111 const void* data,
112 size_t size) {
113 auto* byte_ptr = static_cast<const uint8_t*>(data);
114 ExpectWritePacketData(delay, brillo::Blob{byte_ptr, byte_ptr + size});
115 }
116
ExpectWritePacketData(base::TimeDelta delay,brillo::Blob data)117 void FakeStream::ExpectWritePacketData(base::TimeDelta delay,
118 brillo::Blob data) {
119 OutputDataPacket packet;
120 packet.expected_size = data.size();
121 packet.data = std::move(data);
122 packet.delay_before = delay;
123 outgoing_queue_.push(std::move(packet));
124 }
125
ExpectWritePacketString(base::TimeDelta delay,const std::string & data)126 void FakeStream::ExpectWritePacketString(base::TimeDelta delay,
127 const std::string& data) {
128 ExpectWritePacketData(delay, brillo::Blob{data.begin(), data.end()});
129 }
130
QueueWriteError(base::TimeDelta delay)131 void FakeStream::QueueWriteError(base::TimeDelta delay) {
132 QueueWriteErrorWithMessage(delay, std::string{});
133 }
134
QueueWriteErrorWithMessage(base::TimeDelta delay,const std::string & message)135 void FakeStream::QueueWriteErrorWithMessage(base::TimeDelta delay,
136 const std::string& message) {
137 OutputDataPacket packet;
138 packet.expected_size = 0;
139 packet.data.assign(message.begin(), message.end());
140 packet.delay_before = delay;
141 packet.write_error = true;
142 outgoing_queue_.push(std::move(packet));
143 }
144
ClearWriteQueue()145 void FakeStream::ClearWriteQueue() {
146 std::queue<OutputDataPacket>().swap(outgoing_queue_);
147 delay_output_until_ = base::Time{};
148 output_buffer_.clear();
149 expected_output_data_.clear();
150 max_output_buffer_size_ = 0;
151 all_output_data_.clear();
152 report_write_error_ = 0;
153 }
154
GetFlushedOutputData() const155 const brillo::Blob& FakeStream::GetFlushedOutputData() const {
156 return all_output_data_;
157 }
158
GetFlushedOutputDataAsString() const159 std::string FakeStream::GetFlushedOutputDataAsString() const {
160 return std::string{all_output_data_.begin(), all_output_data_.end()};
161 }
162
CanRead() const163 bool FakeStream::CanRead() const {
164 return stream_utils::IsReadAccessMode(mode_);
165 }
166
CanWrite() const167 bool FakeStream::CanWrite() const {
168 return stream_utils::IsWriteAccessMode(mode_);
169 }
170
SetSizeBlocking(uint64_t,ErrorPtr * error)171 bool FakeStream::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
172 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
173 }
174
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)175 bool FakeStream::Seek(int64_t /* offset */,
176 Whence /* whence */,
177 uint64_t* /* new_position */,
178 ErrorPtr* error) {
179 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
180 }
181
IsReadBufferEmpty() const182 bool FakeStream::IsReadBufferEmpty() const {
183 return input_ptr_ >= input_buffer_.size();
184 }
185
PopReadPacket()186 bool FakeStream::PopReadPacket() {
187 if (incoming_queue_.empty())
188 return false;
189 InputDataPacket& packet = incoming_queue_.front();
190 input_ptr_ = 0;
191 input_buffer_ = std::move(packet.data);
192 delay_input_until_ = clock_->Now() + packet.delay_before;
193 incoming_queue_.pop();
194 report_read_error_ = packet.read_error;
195 return true;
196 }
197
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)198 bool FakeStream::ReadNonBlocking(void* buffer,
199 size_t size_to_read,
200 size_t* size_read,
201 bool* end_of_stream,
202 ErrorPtr* error) {
203 if (!CanRead())
204 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
205
206 if (!IsOpen())
207 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
208
209 for (;;) {
210 if (!delay_input_until_.is_null() && clock_->Now() < delay_input_until_) {
211 *size_read = 0;
212 if (end_of_stream)
213 *end_of_stream = false;
214 break;
215 }
216
217 if (report_read_error_) {
218 report_read_error_ = false;
219 std::string message{input_buffer_.begin(), input_buffer_.end()};
220 if (message.empty())
221 message = "Simulating read error for tests";
222 input_buffer_.clear();
223 Error::AddTo(error, FROM_HERE, "fake_stream", "read_error", message);
224 return false;
225 }
226
227 if (!IsReadBufferEmpty()) {
228 size_to_read = std::min(size_to_read, input_buffer_.size() - input_ptr_);
229 std::memcpy(buffer, input_buffer_.data() + input_ptr_, size_to_read);
230 input_ptr_ += size_to_read;
231 *size_read = size_to_read;
232 if (end_of_stream)
233 *end_of_stream = false;
234 break;
235 }
236
237 if (!PopReadPacket()) {
238 *size_read = 0;
239 if (end_of_stream)
240 *end_of_stream = true;
241 break;
242 }
243 }
244 return true;
245 }
246
IsWriteBufferFull() const247 bool FakeStream::IsWriteBufferFull() const {
248 return output_buffer_.size() >= max_output_buffer_size_;
249 }
250
PopWritePacket()251 bool FakeStream::PopWritePacket() {
252 if (outgoing_queue_.empty())
253 return false;
254 OutputDataPacket& packet = outgoing_queue_.front();
255 expected_output_data_ = std::move(packet.data);
256 delay_output_until_ = clock_->Now() + packet.delay_before;
257 max_output_buffer_size_ = packet.expected_size;
258 report_write_error_ = packet.write_error;
259 outgoing_queue_.pop();
260 return true;
261 }
262
WriteNonBlocking(const void * buffer,size_t size_to_write,size_t * size_written,ErrorPtr * error)263 bool FakeStream::WriteNonBlocking(const void* buffer,
264 size_t size_to_write,
265 size_t* size_written,
266 ErrorPtr* error) {
267 if (!CanWrite())
268 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
269
270 if (!IsOpen())
271 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
272
273 for (;;) {
274 if (!delay_output_until_.is_null() && clock_->Now() < delay_output_until_) {
275 *size_written = 0;
276 return true;
277 }
278
279 if (report_write_error_) {
280 report_write_error_ = false;
281 std::string message{expected_output_data_.begin(),
282 expected_output_data_.end()};
283 if (message.empty())
284 message = "Simulating write error for tests";
285 output_buffer_.clear();
286 max_output_buffer_size_ = 0;
287 expected_output_data_.clear();
288 Error::AddTo(error, FROM_HERE, "fake_stream", "write_error", message);
289 return false;
290 }
291
292 if (!IsWriteBufferFull()) {
293 bool success = true;
294 size_to_write = std::min(size_to_write,
295 max_output_buffer_size_ - output_buffer_.size());
296 auto byte_ptr = static_cast<const uint8_t*>(buffer);
297 output_buffer_.insert(output_buffer_.end(),
298 byte_ptr, byte_ptr + size_to_write);
299 if (output_buffer_.size() == max_output_buffer_size_) {
300 if (!expected_output_data_.empty() &&
301 expected_output_data_ != output_buffer_) {
302 // We expected different data to be written, report an error.
303 Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
304 "Unexpected data written");
305 success = false;
306 }
307
308 all_output_data_.insert(all_output_data_.end(),
309 output_buffer_.begin(), output_buffer_.end());
310
311 output_buffer_.clear();
312 max_output_buffer_size_ = 0;
313 expected_output_data_.clear();
314 }
315 *size_written = size_to_write;
316 return success;
317 }
318
319 if (!PopWritePacket()) {
320 // No more data expected.
321 Error::AddTo(error, FROM_HERE, "fake_stream", "full",
322 "No more output data expected");
323 return false;
324 }
325 }
326 }
327
FlushBlocking(ErrorPtr * error)328 bool FakeStream::FlushBlocking(ErrorPtr* error) {
329 if (!CanWrite())
330 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
331
332 if (!IsOpen())
333 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
334
335 bool success = true;
336 if (!output_buffer_.empty()) {
337 if (!expected_output_data_.empty() &&
338 expected_output_data_ != output_buffer_) {
339 // We expected different data to be written, report an error.
340 Error::AddTo(error, FROM_HERE, "fake_stream", "data_mismatch",
341 "Unexpected data written");
342 success = false;
343 }
344 all_output_data_.insert(all_output_data_.end(),
345 output_buffer_.begin(), output_buffer_.end());
346
347 output_buffer_.clear();
348 max_output_buffer_size_ = 0;
349 expected_output_data_.clear();
350 }
351 return success;
352 }
353
CloseBlocking(ErrorPtr *)354 bool FakeStream::CloseBlocking(ErrorPtr* /* error */) {
355 is_open_ = false;
356 return true;
357 }
358
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)359 bool FakeStream::WaitForData(AccessMode mode,
360 const base::Callback<void(AccessMode)>& callback,
361 ErrorPtr* error) {
362 bool read_requested = stream_utils::IsReadAccessMode(mode);
363 bool write_requested = stream_utils::IsWriteAccessMode(mode);
364
365 if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
366 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
367
368 if (read_requested && IsReadBufferEmpty())
369 PopReadPacket();
370 if (write_requested && IsWriteBufferFull())
371 PopWritePacket();
372
373 base::TimeDelta delay;
374 GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
375 write_requested, delay_output_until_, &mode, &delay);
376 MessageLoop::current()->PostDelayedTask(
377 FROM_HERE, base::Bind(callback, mode), delay);
378 return true;
379 }
380
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)381 bool FakeStream::WaitForDataBlocking(AccessMode in_mode,
382 base::TimeDelta timeout,
383 AccessMode* out_mode,
384 ErrorPtr* error) {
385 bool read_requested = stream_utils::IsReadAccessMode(in_mode);
386 bool write_requested = stream_utils::IsWriteAccessMode(in_mode);
387
388 if ((read_requested && !CanRead()) || (write_requested && !CanWrite()))
389 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
390
391 base::TimeDelta delay;
392 GetMinDelayAndMode(clock_->Now(), read_requested, delay_input_until_,
393 write_requested, delay_output_until_, out_mode, &delay);
394
395 if (timeout < delay)
396 return stream_utils::ErrorOperationTimeout(FROM_HERE, error);
397
398 LOG(INFO) << "TEST: Would have blocked for " << delay.InMilliseconds()
399 << " ms.";
400
401 return true;
402 }
403
404 } // namespace brillo
405