1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <brillo/streams/stream.h>
6 
7 #include <algorithm>
8 
9 #include <base/bind.h>
10 #include <brillo/message_loops/message_loop.h>
11 #include <brillo/pointer_utils.h>
12 #include <brillo/streams/stream_errors.h>
13 #include <brillo/streams/stream_utils.h>
14 
15 namespace brillo {
16 
TruncateBlocking(ErrorPtr * error)17 bool Stream::TruncateBlocking(ErrorPtr* error) {
18   return SetSizeBlocking(GetPosition(), error);
19 }
20 
SetPosition(uint64_t position,ErrorPtr * error)21 bool Stream::SetPosition(uint64_t position, ErrorPtr* error) {
22   if (!stream_utils::CheckInt64Overflow(FROM_HERE, position, 0, error))
23     return false;
24   return Seek(position, Whence::FROM_BEGIN, nullptr, error);
25 }
26 
ReadAsync(void * buffer,size_t size_to_read,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)27 bool Stream::ReadAsync(void* buffer,
28                        size_t size_to_read,
29                        const base::Callback<void(size_t)>& success_callback,
30                        const ErrorCallback& error_callback,
31                        ErrorPtr* error) {
32   if (is_async_read_pending_) {
33     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
34                  errors::stream::kOperationNotSupported,
35                  "Another asynchronous operation is still pending");
36     return false;
37   }
38 
39   auto callback = base::Bind(&Stream::IgnoreEOSCallback, success_callback);
40   // If we can read some data right away non-blocking we should still run the
41   // callback from the main loop, so we pass true here for force_async_callback.
42   return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
43                        true);
44 }
45 
ReadAllAsync(void * buffer,size_t size_to_read,const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)46 bool Stream::ReadAllAsync(void* buffer,
47                           size_t size_to_read,
48                           const base::Closure& success_callback,
49                           const ErrorCallback& error_callback,
50                           ErrorPtr* error) {
51   if (is_async_read_pending_) {
52     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
53                  errors::stream::kOperationNotSupported,
54                  "Another asynchronous operation is still pending");
55     return false;
56   }
57 
58   auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
59                              weak_ptr_factory_.GetWeakPtr(), buffer,
60                              size_to_read, success_callback, error_callback);
61   return ReadAsyncImpl(buffer, size_to_read, callback, error_callback, error,
62                        true);
63 }
64 
ReadBlocking(void * buffer,size_t size_to_read,size_t * size_read,ErrorPtr * error)65 bool Stream::ReadBlocking(void* buffer,
66                           size_t size_to_read,
67                           size_t* size_read,
68                           ErrorPtr* error) {
69   for (;;) {
70     bool eos = false;
71     if (!ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
72       return false;
73 
74     if (*size_read > 0 || eos)
75       break;
76 
77     if (!WaitForDataBlocking(AccessMode::READ, base::TimeDelta::Max(), nullptr,
78                              error)) {
79       return false;
80     }
81   }
82   return true;
83 }
84 
ReadAllBlocking(void * buffer,size_t size_to_read,ErrorPtr * error)85 bool Stream::ReadAllBlocking(void* buffer,
86                              size_t size_to_read,
87                              ErrorPtr* error) {
88   while (size_to_read > 0) {
89     size_t size_read = 0;
90     if (!ReadBlocking(buffer, size_to_read, &size_read, error))
91       return false;
92 
93     if (size_read == 0)
94       return stream_utils::ErrorReadPastEndOfStream(FROM_HERE, error);
95 
96     size_to_read -= size_read;
97     buffer = AdvancePointer(buffer, size_read);
98   }
99   return true;
100 }
101 
WriteAsync(const void * buffer,size_t size_to_write,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)102 bool Stream::WriteAsync(const void* buffer,
103                         size_t size_to_write,
104                         const base::Callback<void(size_t)>& success_callback,
105                         const ErrorCallback& error_callback,
106                         ErrorPtr* error) {
107   if (is_async_write_pending_) {
108     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
109                  errors::stream::kOperationNotSupported,
110                  "Another asynchronous operation is still pending");
111     return false;
112   }
113   // If we can read some data right away non-blocking we should still run the
114   // callback from the main loop, so we pass true here for force_async_callback.
115   return WriteAsyncImpl(buffer, size_to_write, success_callback, error_callback,
116                         error, true);
117 }
118 
WriteAllAsync(const void * buffer,size_t size_to_write,const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr * error)119 bool Stream::WriteAllAsync(const void* buffer,
120                            size_t size_to_write,
121                            const base::Closure& success_callback,
122                            const ErrorCallback& error_callback,
123                            ErrorPtr* error) {
124   if (is_async_write_pending_) {
125     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
126                  errors::stream::kOperationNotSupported,
127                  "Another asynchronous operation is still pending");
128     return false;
129   }
130 
131   auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
132                              weak_ptr_factory_.GetWeakPtr(), buffer,
133                              size_to_write, success_callback, error_callback);
134   return WriteAsyncImpl(buffer, size_to_write, callback, error_callback, error,
135                         true);
136 }
137 
WriteBlocking(const void * buffer,size_t size_to_write,size_t * size_written,ErrorPtr * error)138 bool Stream::WriteBlocking(const void* buffer,
139                            size_t size_to_write,
140                            size_t* size_written,
141                            ErrorPtr* error) {
142   for (;;) {
143     if (!WriteNonBlocking(buffer, size_to_write, size_written, error))
144       return false;
145 
146     if (*size_written > 0 || size_to_write == 0)
147       break;
148 
149     if (!WaitForDataBlocking(AccessMode::WRITE, base::TimeDelta::Max(), nullptr,
150                              error)) {
151       return false;
152     }
153   }
154   return true;
155 }
156 
WriteAllBlocking(const void * buffer,size_t size_to_write,ErrorPtr * error)157 bool Stream::WriteAllBlocking(const void* buffer,
158                               size_t size_to_write,
159                               ErrorPtr* error) {
160   while (size_to_write > 0) {
161     size_t size_written = 0;
162     if (!WriteBlocking(buffer, size_to_write, &size_written, error))
163       return false;
164 
165     if (size_written == 0) {
166       Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
167                    errors::stream::kPartialData,
168                    "Failed to write all the data");
169       return false;
170     }
171     size_to_write -= size_written;
172     buffer = AdvancePointer(buffer, size_written);
173   }
174   return true;
175 }
176 
FlushAsync(const base::Closure & success_callback,const ErrorCallback & error_callback,ErrorPtr *)177 bool Stream::FlushAsync(const base::Closure& success_callback,
178                         const ErrorCallback& error_callback,
179                         ErrorPtr* /* error */) {
180   auto callback = base::Bind(&Stream::FlushAsyncCallback,
181                              weak_ptr_factory_.GetWeakPtr(),
182                              success_callback, error_callback);
183   MessageLoop::current()->PostTask(FROM_HERE, callback);
184   return true;
185 }
186 
IgnoreEOSCallback(const base::Callback<void (size_t)> & success_callback,size_t bytes,bool)187 void Stream::IgnoreEOSCallback(
188     const base::Callback<void(size_t)>& success_callback,
189     size_t bytes,
190     bool /* eos */) {
191   success_callback.Run(bytes);
192 }
193 
ReadAsyncImpl(void * buffer,size_t size_to_read,const base::Callback<void (size_t,bool)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error,bool force_async_callback)194 bool Stream::ReadAsyncImpl(
195     void* buffer,
196     size_t size_to_read,
197     const base::Callback<void(size_t, bool)>& success_callback,
198     const ErrorCallback& error_callback,
199     ErrorPtr* error,
200     bool force_async_callback) {
201   CHECK(!is_async_read_pending_);
202   // We set this value to true early in the function so calling others will
203   // prevent us from calling WaitForData() to make calls to
204   // ReadAsync() fail while we run WaitForData().
205   is_async_read_pending_ = true;
206 
207   size_t read = 0;
208   bool eos = false;
209   if (!ReadNonBlocking(buffer, size_to_read, &read, &eos, error))
210     return false;
211 
212   if (read > 0 || eos) {
213     if (force_async_callback) {
214       MessageLoop::current()->PostTask(
215           FROM_HERE,
216           base::BindOnce(&Stream::OnReadAsyncDone,
217                          weak_ptr_factory_.GetWeakPtr(),
218                          success_callback, read, eos));
219     } else {
220       is_async_read_pending_ = false;
221       success_callback.Run(read, eos);
222     }
223     return true;
224   }
225 
226   is_async_read_pending_ = WaitForData(
227       AccessMode::READ,
228       base::Bind(&Stream::OnReadAvailable, weak_ptr_factory_.GetWeakPtr(),
229                  buffer, size_to_read, success_callback, error_callback),
230       error);
231   return is_async_read_pending_;
232 }
233 
OnReadAsyncDone(const base::Callback<void (size_t,bool)> & success_callback,size_t bytes_read,bool eos)234 void Stream::OnReadAsyncDone(
235     const base::Callback<void(size_t, bool)>& success_callback,
236     size_t bytes_read,
237     bool eos) {
238   is_async_read_pending_ = false;
239   success_callback.Run(bytes_read, eos);
240 }
241 
OnReadAvailable(void * buffer,size_t size_to_read,const base::Callback<void (size_t,bool)> & success_callback,const ErrorCallback & error_callback,AccessMode mode)242 void Stream::OnReadAvailable(
243     void* buffer,
244     size_t size_to_read,
245     const base::Callback<void(size_t, bool)>& success_callback,
246     const ErrorCallback& error_callback,
247     AccessMode mode) {
248   CHECK(stream_utils::IsReadAccessMode(mode));
249   CHECK(is_async_read_pending_);
250   is_async_read_pending_ = false;
251   ErrorPtr error;
252   // Just reschedule the read operation but don't need to run the callback from
253   // the main loop since we are already running on a callback.
254   if (!ReadAsyncImpl(buffer, size_to_read, success_callback, error_callback,
255                      &error, false)) {
256     error_callback.Run(error.get());
257   }
258 }
259 
WriteAsyncImpl(const void * buffer,size_t size_to_write,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,ErrorPtr * error,bool force_async_callback)260 bool Stream::WriteAsyncImpl(
261     const void* buffer,
262     size_t size_to_write,
263     const base::Callback<void(size_t)>& success_callback,
264     const ErrorCallback& error_callback,
265     ErrorPtr* error,
266     bool force_async_callback) {
267   CHECK(!is_async_write_pending_);
268   // We set this value to true early in the function so calling others will
269   // prevent us from calling WaitForData() to make calls to
270   // ReadAsync() fail while we run WaitForData().
271   is_async_write_pending_ = true;
272 
273   size_t written = 0;
274   if (!WriteNonBlocking(buffer, size_to_write, &written, error))
275     return false;
276 
277   if (written > 0) {
278     if (force_async_callback) {
279       MessageLoop::current()->PostTask(
280           FROM_HERE,
281           base::BindOnce(&Stream::OnWriteAsyncDone,
282                          weak_ptr_factory_.GetWeakPtr(),
283                          success_callback, written));
284     } else {
285       is_async_write_pending_ = false;
286       success_callback.Run(written);
287     }
288     return true;
289   }
290   is_async_write_pending_ = WaitForData(
291       AccessMode::WRITE,
292       base::Bind(&Stream::OnWriteAvailable, weak_ptr_factory_.GetWeakPtr(),
293                  buffer, size_to_write, success_callback, error_callback),
294       error);
295   return is_async_write_pending_;
296 }
297 
OnWriteAsyncDone(const base::Callback<void (size_t)> & success_callback,size_t size_written)298 void Stream::OnWriteAsyncDone(
299     const base::Callback<void(size_t)>& success_callback,
300     size_t size_written) {
301   is_async_write_pending_ = false;
302   success_callback.Run(size_written);
303 }
304 
OnWriteAvailable(const void * buffer,size_t size,const base::Callback<void (size_t)> & success_callback,const ErrorCallback & error_callback,AccessMode mode)305 void Stream::OnWriteAvailable(
306     const void* buffer,
307     size_t size,
308     const base::Callback<void(size_t)>& success_callback,
309     const ErrorCallback& error_callback,
310     AccessMode mode) {
311   CHECK(stream_utils::IsWriteAccessMode(mode));
312   CHECK(is_async_write_pending_);
313   is_async_write_pending_ = false;
314   ErrorPtr error;
315   // Just reschedule the read operation but don't need to run the callback from
316   // the main loop since we are already running on a callback.
317   if (!WriteAsyncImpl(buffer, size, success_callback, error_callback, &error,
318                       false)) {
319     error_callback.Run(error.get());
320   }
321 }
322 
ReadAllAsyncCallback(void * buffer,size_t size_to_read,const base::Closure & success_callback,const ErrorCallback & error_callback,size_t size_read,bool eos)323 void Stream::ReadAllAsyncCallback(void* buffer,
324                                   size_t size_to_read,
325                                   const base::Closure& success_callback,
326                                   const ErrorCallback& error_callback,
327                                   size_t size_read,
328                                   bool eos) {
329   ErrorPtr error;
330   size_to_read -= size_read;
331   if (size_to_read != 0 && eos) {
332     stream_utils::ErrorReadPastEndOfStream(FROM_HERE, &error);
333     error_callback.Run(error.get());
334     return;
335   }
336 
337   if (size_to_read) {
338     buffer = AdvancePointer(buffer, size_read);
339     auto callback = base::Bind(&Stream::ReadAllAsyncCallback,
340                                weak_ptr_factory_.GetWeakPtr(), buffer,
341                                size_to_read, success_callback, error_callback);
342     if (!ReadAsyncImpl(buffer, size_to_read, callback, error_callback, &error,
343                        false)) {
344       error_callback.Run(error.get());
345     }
346   } else {
347     success_callback.Run();
348   }
349 }
350 
WriteAllAsyncCallback(const void * buffer,size_t size_to_write,const base::Closure & success_callback,const ErrorCallback & error_callback,size_t size_written)351 void Stream::WriteAllAsyncCallback(const void* buffer,
352                                    size_t size_to_write,
353                                    const base::Closure& success_callback,
354                                    const ErrorCallback& error_callback,
355                                    size_t size_written) {
356   ErrorPtr error;
357   if (size_to_write != 0 && size_written == 0) {
358     Error::AddTo(&error, FROM_HERE, errors::stream::kDomain,
359                  errors::stream::kPartialData, "Failed to write all the data");
360     error_callback.Run(error.get());
361     return;
362   }
363   size_to_write -= size_written;
364   if (size_to_write) {
365     buffer = AdvancePointer(buffer, size_written);
366     auto callback = base::Bind(&Stream::WriteAllAsyncCallback,
367                                weak_ptr_factory_.GetWeakPtr(), buffer,
368                                size_to_write, success_callback, error_callback);
369     if (!WriteAsyncImpl(buffer, size_to_write, callback, error_callback, &error,
370                         false)) {
371       error_callback.Run(error.get());
372     }
373   } else {
374     success_callback.Run();
375   }
376 }
377 
FlushAsyncCallback(const base::Closure & success_callback,const ErrorCallback & error_callback)378 void Stream::FlushAsyncCallback(const base::Closure& success_callback,
379                                 const ErrorCallback& error_callback) {
380   ErrorPtr error;
381   if (FlushBlocking(&error)) {
382     success_callback.Run();
383   } else {
384     error_callback.Run(error.get());
385   }
386 }
387 
CancelPendingAsyncOperations()388 void Stream::CancelPendingAsyncOperations() {
389   weak_ptr_factory_.InvalidateWeakPtrs();
390   is_async_read_pending_ = false;
391   is_async_write_pending_ = false;
392 }
393 
394 }  // namespace brillo
395