1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef LIBBRILLO_BRILLO_STREAMS_STREAM_H_
6 #define LIBBRILLO_BRILLO_STREAMS_STREAM_H_
7 
8 #include <cstdint>
9 #include <memory>
10 
11 #include <base/callback.h>
12 #include <base/macros.h>
13 #include <base/memory/weak_ptr.h>
14 #include <base/time/time.h>
15 #include <brillo/brillo_export.h>
16 #include <brillo/errors/error.h>
17 
18 namespace brillo {
19 
20 // Stream is a base class that specific stream storage implementations must
21 // derive from to provide I/O facilities.
22 // The stream class provides general streaming I/O primitives to read, write and
23 // seek within a stream. It has methods for asynchronous (callback-based) as
24 // well as synchronous (both blocking and non-blocking) operations.
25 // The Stream class is abstract and cannot be created by itself.
26 // In order to construct a stream, you must use one of the derived classes'
27 // factory methods which return a stream smart pointer (StreamPtr):
28 //
29 //    StreamPtr input_stream = FileStream::Open(path, AccessMode::READ);
30 //    StreamPtr output_stream = MemoryStream::Create();
31 //    uint8_t buf[1000];
32 //    size_t read = 0;
33 //    while (input_stream->ReadBlocking(buf, sizeof(buf), &read, nullptr)) {
34 //      if (read == 0) break;
35 //      output_stream->WriteAllBlocking(buf, read, nullptr);
36 //    }
37 //
38 // NOTE ABOUT ASYNCHRONOUS OPERATIONS: Asynchronous I/O relies on a MessageLoop
39 // instance to be present on the current thread. Using Stream::ReadAsync(),
40 // Stream::WriteAsync() and similar will call MessageLoop::current() to access
41 // the current message loop and abort if there isn't one for the current thread.
42 // Also, only one outstanding asynchronous operation of particular kind (reading
43 // or writing) at a time is supported. Trying to call ReadAsync() while another
44 // asynchronous read operation is pending will fail with an error
45 // ("operation_not_supported").
46 //
47 // NOTE ABOUT READING FROM/WRITING TO STREAMS: In many cases underlying streams
48 // use buffered I/O. Using all read/write methods other than ReadAllAsync(),
49 // ReadAllBlocking(), WriteAllAsync(), WriteAllBlocking() will return
50 // immediately if there is any data available in the underlying buffer. That is,
51 // trying to read 1000 bytes while the internal buffer contains only 100 will
52 // return immediately with just those 100 bytes and no blocking or other I/O
53 // traffic will be incurred. This guarantee is important for efficient and
54 // correct implementation of duplex communication over pipes and sockets.
55 //
56 // NOTE TO IMPLEMENTERS: When creating new stream types, you must derive
57 // from this class and provide the implementation for its pure virtual methods.
58 // For operations that do not apply to your stream, make sure the corresponding
59 // methods return "false" and set the error to "operation_not_supported".
60 // You should use stream_utils::ErrorOperationNotSupported() for this. Also
61 // Make sure the stream capabilities functions like CanRead(), etc return
62 // correct values:
63 //
64 //    bool MyReadOnlyStream::CanRead() const { return true; }
65 //    bool MyReadOnlyStream::CanWrite() const { return false; }
66 //    bool MyReadOnlyStream::WriteBlocking(const void* buffer,
67 //                                         size_t size_to_write,
68 //                                         size_t* size_written,
69 //                                         ErrorPtr* error) {
70 //      return stream_utils::ErrorOperationNotSupported(error);
71 //    }
72 //
73 // The class should also provide a static factory methods to create/open
74 // a new stream:
75 //
76 //    static StreamPtr MyReadOnlyStream::Open(..., ErrorPtr* error) {
77 //      auto my_stream = std::make_unique<MyReadOnlyStream>(...);
78 //      if (!my_stream->Initialize(..., error))
79 //        my_stream.reset();
80 //      }
81 //      return my_stream;
82 //    }
83 //
84 class BRILLO_EXPORT Stream {
85  public:
86   // When seeking in streams, whence specifies the origin of the seek operation.
87   enum class Whence { FROM_BEGIN, FROM_CURRENT, FROM_END };
88   // Stream access mode for open operations (used in derived classes).
89   enum class AccessMode { READ, WRITE, READ_WRITE };
90 
91   // Standard error callback for asynchronous operations.
92   using ErrorCallback = base::Callback<void(const Error*)>;
93 
94   virtual ~Stream() = default;
95 
96   // == Stream capabilities ===================================================
97 
98   // Returns true while stream is open. Closing the last reference to the stream
99   // will make this method return false.
100   virtual bool IsOpen() const = 0;
101 
102   // Called to determine if read operations are supported on the stream (stream
103   // is readable). This method does not check if there is actually any data to
104   // read, only the fact that the stream is open in read mode and can be read
105   // from in general.
106   // If CanRead() returns false, it is guaranteed that the stream can't be
107   // read from. However, if it returns true, there is no guarantee that the
108   // subsequent read operation will actually succeed (for example, the stream
109   // position could be at the end of the data stream, or the access mode of
110   // the stream is unknown beforehand).
111   virtual bool CanRead() const = 0;
112 
113   // Called to determine if write operations are supported on the stream (stream
114   // is writable).
115   // If CanWrite() returns false, it is guaranteed that the stream can't be
116   // written to. However, if it returns true, the subsequent write operation
117   // is not guaranteed to succeed (e.g. the output media could be out of free
118   // space or a transport error could occur).
119   virtual bool CanWrite() const = 0;
120 
121   // Called to determine if random access I/O operations are supported on
122   // the stream. Sequential streams should return false.
123   // If CanSeek() returns false, it is guaranteed that the stream can't use
124   // Seek(). However, if it returns true, it might be possible to seek, but this
125   // is not guaranteed since the actual underlying stream capabilities might
126   // not be known.
127   // Note that non-seekable streams might still maintain the current stream
128   // position and GetPosition method might still be used even if CanSeek()
129   // returns false. However SetPosition() will almost always fail in such
130   // a case.
131   virtual bool CanSeek() const = 0;
132 
133   // Called to determine if the size of the stream is known. Size of some
134   // sequential streams (e.g. based on pipes) is unknown beforehand, so this
135   // method can be used to check how reliable a call to GetSize() is.
136   virtual bool CanGetSize() const = 0;
137 
138   // == Stream size operations ================================================
139 
140   // Returns the size of stream data.
141   // If the stream size is unavailable/unknown, it returns 0.
142   virtual uint64_t GetSize() const = 0;
143 
144   // Resizes the stream storage to |size|. Stream must be writable and support
145   // this operation.
146   virtual bool SetSizeBlocking(uint64_t size, ErrorPtr* error) = 0;
147 
148   // Truncates the stream at the current stream pointer.
149   // Calls SetSizeBlocking(GetPosition(), ...).
150   bool TruncateBlocking(ErrorPtr* error);
151 
152   // Returns the amount of data remaining in the stream. If the size of the
153   // stream is unknown, or if the stream pointer is at or past the end of the
154   // stream, the function returns 0.
155   virtual uint64_t GetRemainingSize() const = 0;
156 
157   // == Seek operations =======================================================
158 
159   // Gets the position of the stream I/O pointer from the beginning of the
160   // stream. If the stream position is unavailable/unknown, it returns 0.
161   virtual uint64_t GetPosition() const = 0;
162 
163   // Moves the stream pointer to the specified position, relative to the
164   // beginning of the stream. This calls Seek(position, Whence::FROM_BEGIN),
165   // however it also provides proper |position| validation to ensure that
166   // it doesn't overflow the range of signed int64_t used by Seek.
167   bool SetPosition(uint64_t position, ErrorPtr* error);
168 
169   // Moves the stream pointer by |offset| bytes relative to |whence|.
170   // When successful, returns true and sets the new pointer position from the
171   // beginning of the stream to |new_position|. If |new_position| is nullptr,
172   // new stream position is not returned.
173   // On error, returns false and specifies additional details in |error| if it
174   // is not nullptr.
175   virtual bool Seek(int64_t offset,
176                     Whence whence,
177                     uint64_t* new_position,
178                     ErrorPtr* error) = 0;
179 
180   // == Read operations =======================================================
181 
182   // -- Asynchronous ----------------------------------------------------------
183 
184   // Reads up to |size_to_read| bytes from the stream asynchronously. It is not
185   // guaranteed that all requested data will be read. It is not an error for
186   // this function to read fewer bytes than requested. If the function reads
187   // zero bytes, it means that the end of stream is reached.
188   // Upon successful read, the |success_callback| will be invoked with the
189   // actual number of bytes read.
190   // If an error occurs during the asynchronous operation, the |error_callback|
191   // is invoked with the error details. The error object pointer passed in as a
192   // parameter to the |error_callback| is valid only for the duration of that
193   // callback.
194   // If this function successfully schedules an asynchronous operation, it
195   // returns true. If it fails immediately, it will return false and set the
196   // error details to |error| object and will not call the success or error
197   // callbacks.
198   // The |buffer| must be at least |size_to_read| in size and must remain
199   // valid for the duration of the asynchronous operation (until either
200   // |success_callback| or |error_callback| is called).
201   // Only one asynchronous operation at a time is allowed on the stream (read
202   // and/or write)
203   // Uses ReadNonBlocking() and MonitorDataAvailable().
204   virtual bool ReadAsync(void* buffer,
205                          size_t size_to_read,
206                          const base::Callback<void(size_t)>& success_callback,
207                          const ErrorCallback& error_callback,
208                          ErrorPtr* error);
209 
210   // Similar to ReadAsync() operation above but reads exactly |size_to_read|
211   // bytes from the stream into the |buffer|. Attempt to read past the end of
212   // the stream is considered an error in this case and will trigger the
213   // |error_callback|. The rest of restrictions and conditions of ReadAsync()
214   // method applies to ReadAllAsync() as well.
215   // Uses ReadNonBlocking() and MonitorDataAvailable().
216   virtual bool ReadAllAsync(void* buffer,
217                             size_t size_to_read,
218                             const base::Closure& success_callback,
219                             const ErrorCallback& error_callback,
220                             ErrorPtr* error);
221 
222   // -- Synchronous non-blocking ----------------------------------------------
223 
224   // Reads up to |size_to_read| bytes from the stream without blocking.
225   // The |buffer| must be at least |size_to_read| in size. It is not an error
226   // for this function to return without reading all (or any) the data.
227   // The actual amount of data read (which could be 0 bytes) is returned in
228   // |size_read|.
229   // On error, the function returns false and specifies additional error details
230   // in |error|.
231   // If end of stream is reached or if no data is currently available to be read
232   // without blocking, |size_read| will contain 0 and the function will still
233   // return true (success). In case of end-of-stream scenario, |end_of_stream|
234   // will also be set to true to indicate that no more data is available.
235   virtual bool ReadNonBlocking(void* buffer,
236                                size_t size_to_read,
237                                size_t* size_read,
238                                bool* end_of_stream,
239                                ErrorPtr* error) = 0;
240 
241   // -- Synchronous blocking --------------------------------------------------
242 
243   // Reads up to |size_to_read| bytes from the stream. This function will block
244   // until at least one byte is read or the end of stream is reached or until
245   // the stream is closed.
246   // The |buffer| must be at least |size_to_read| in size. It is not an error
247   // for this function to return without reading all the data. The actual amount
248   // of data read (which could be 0 bytes) is returned in |size_read|.
249   // On error, the function returns false and specifies additional error details
250   // in |error|. In this case, the state of the stream pointer is undefined,
251   // since some bytes might have been read successfully (and the pointer moved)
252   // before the error has occurred and |size_read| is not updated.
253   // If end of stream is reached, |size_read| will contain 0 and the function
254   // will still return true (success).
255   virtual bool ReadBlocking(void* buffer,
256                             size_t size_to_read,
257                             size_t* size_read,
258                             ErrorPtr* error);
259 
260   // Reads exactly |size_to_read| bytes to |buffer|. Returns false on error
261   // (reading fewer than requested bytes is treated as an error as well).
262   // Calls ReadAllBlocking() repeatedly until all the data is read.
263   virtual bool ReadAllBlocking(void* buffer,
264                                size_t size_to_read,
265                                ErrorPtr* error);
266 
267   // == Write operations ======================================================
268 
269   // -- Asynchronous ----------------------------------------------------------
270 
271   // Writes up to |size_to_write| bytes from |buffer| to the stream
272   // asynchronously. It is not guaranteed that all requested data will be
273   // written. It is not an error for this function to write fewer bytes than
274   // requested.
275   // Upon successful write, the |success_callback| will be invoked with the
276   // actual number of bytes written.
277   // If an error occurs during the asynchronous operation, the |error_callback|
278   // is invoked with the error details. The error object pointer is valid only
279   // for the duration of the error callback.
280   // If this function successfully schedules an asynchronous operation, it
281   // returns true. If it fails immediately, it will return false and set the
282   // error details to |error| object and will not call the success or error
283   // callbacks.
284   // The |buffer| must be at least |size_to_write| in size and must remain
285   // valid for the duration of the asynchronous operation (until either
286   // |success_callback| or |error_callback| is called).
287   // Only one asynchronous operation at a time is allowed on the stream (read
288   // and/or write).
289   // Uses WriteNonBlocking() and MonitorDataAvailable().
290   virtual bool WriteAsync(const void* buffer,
291                           size_t size_to_write,
292                           const base::Callback<void(size_t)>& success_callback,
293                           const ErrorCallback& error_callback,
294                           ErrorPtr* error);
295 
296   // Similar to WriteAsync() operation above but writes exactly |size_to_write|
297   // bytes from |buffet| to the stream. When all the data is written
298   // successfully, the |success_callback| is invoked.
299   // The rest of restrictions and conditions of WriteAsync() method applies to
300   // WriteAllAsync() as well.
301   // Uses WriteNonBlocking() and MonitorDataAvailable().
302   virtual bool WriteAllAsync(const void* buffer,
303                              size_t size_to_write,
304                              const base::Closure& success_callback,
305                              const ErrorCallback& error_callback,
306                              ErrorPtr* error);
307 
308   // -- Synchronous non-blocking ----------------------------------------------
309 
310   // Writes up to |size_to_write| bytes to the stream. The |buffer| must be at
311   // least |size_to_write| in size. It is not an error for this function to
312   // return without writing all the data requested (or any data at all).
313   // The actual amount of data written is returned in |size_written|.
314   // On error, the function returns false and specifies additional error details
315   // in |error|.
316   virtual bool WriteNonBlocking(const void* buffer,
317                                 size_t size_to_write,
318                                 size_t* size_written,
319                                 ErrorPtr* error) = 0;
320 
321   // -- Synchronous blocking --------------------------------------------------
322 
323   // Writes up to |size_to_write| bytes to the stream. The |buffer| must be at
324   // least |size_to_write| in size. It is not an error for this function to
325   // return without writing all the data requested. The actual amount of data
326   // written is returned in |size_written|.
327   // On error, the function returns false and specifies additional error details
328   // in |error|.
329   virtual bool WriteBlocking(const void* buffer,
330                              size_t size_to_write,
331                              size_t* size_written,
332                              ErrorPtr* error);
333 
334   // Writes exactly |size_to_write| bytes to |buffer|. Returns false on error
335   // (writing fewer than requested bytes is treated as an error as well).
336   // Calls WriteBlocking() repeatedly until all the data is written.
337   virtual bool WriteAllBlocking(const void* buffer,
338                                 size_t size_to_write,
339                                 ErrorPtr* error);
340 
341   // == Finalizing/closing streams  ===========================================
342 
343   // Flushes all the user-space data from cache output buffers to storage
344   // medium. For read-only streams this is a no-op, however it is still valid
345   // to call this method on read-only streams.
346   // If an error occurs, the function returns false and specifies additional
347   // error details in |error|.
348   virtual bool FlushBlocking(ErrorPtr* error) = 0;
349 
350   // Flushes all the user-space data from the cache output buffer
351   // asynchronously. When all the data is successfully flushed, the
352   // |success_callback| is invoked. If an error occurs while flushing, partial
353   // data might be flushed and |error_callback| is invoked. If there's an error
354   // scheduling the flush operation, it returns false and neither callback will
355   // be called.
356   virtual bool FlushAsync(const base::Closure& success_callback,
357                           const ErrorCallback& error_callback,
358                           ErrorPtr* error);
359 
360   // Closes the underlying stream. The stream is also automatically closed
361   // when the stream object is destroyed, but since closing a stream is
362   // an operation that may fail, in situations when it is important to detect
363   // the failure to close the stream, CloseBlocking() should be used explicitly
364   // before destroying the stream object.
365   virtual bool CloseBlocking(ErrorPtr* error) = 0;
366 
367   // == Data availability monitoring ==========================================
368 
369   // Overloaded by derived classes to provide stream monitoring for read/write
370   // data availability for the stream. Calls |callback| when data can be read
371   // and/or written without blocking.
372   // |mode| specifies the type of operation to monitor for (read, write, both).
373   virtual bool WaitForData(AccessMode mode,
374                            const base::Callback<void(AccessMode)>& callback,
375                            ErrorPtr* error) = 0;
376 
377   // Helper function for implementing blocking I/O. Blocks until the
378   // non-blocking operation specified by |in_mode| can be performed.
379   // If |out_mode| is not nullptr, it receives the actual operation that can be
380   // performed. For example, watching a stream for READ_WRITE while only
381   // READ can be performed, |out_mode| would contain READ even though |in_mode|
382   // was set to READ_WRITE.
383   // |timeout| is the maximum amount of time to wait. Set it to TimeDelta::Max()
384   // to wait indefinitely.
385   virtual bool WaitForDataBlocking(AccessMode in_mode,
386                                    base::TimeDelta timeout,
387                                    AccessMode* out_mode,
388                                    ErrorPtr* error) = 0;
389 
390   // Cancels pending asynchronous read/write operations.
391   virtual void CancelPendingAsyncOperations();
392 
393  protected:
394   Stream() = default;
395 
396  private:
397   // Simple wrapper to call the externally exposed |success_callback| that only
398   // receives a size_t.
399   BRILLO_PRIVATE static void IgnoreEOSCallback(
400       const base::Callback<void(size_t)>& success_callback,
401       size_t read,
402       bool eos);
403 
404   // The internal implementation of ReadAsync() and ReadAllAsync().
405   // Calls ReadNonBlocking and if there's no data available waits for it calling
406   // WaitForData(). The extra |force_async_callback| tell whether the success
407   // callback should be called from the main loop instead of directly from this
408   // method. This method only calls WaitForData() if ReadNonBlocking() returns a
409   // situation in which it would block (bytes_read = 0 and eos = false),
410   // preventing us from calling WaitForData() on streams that don't support such
411   // feature.
412   BRILLO_PRIVATE bool ReadAsyncImpl(
413       void* buffer,
414       size_t size_to_read,
415       const base::Callback<void(size_t, bool)>& success_callback,
416       const ErrorCallback& error_callback,
417       ErrorPtr* error,
418       bool force_async_callback);
419 
420   // Called from the main loop when the ReadAsyncImpl finished right away
421   // without waiting for data. We use this callback to call the
422   // |sucess_callback| but invalidate the callback if the Stream is destroyed
423   // while this call is waiting in the main loop.
424   BRILLO_PRIVATE void OnReadAsyncDone(
425       const base::Callback<void(size_t, bool)>& success_callback,
426       size_t bytes_read,
427       bool eos);
428 
429   // Called from WaitForData() when read operations can be performed
430   // without blocking (the type of operation is provided in |mode|).
431   BRILLO_PRIVATE void OnReadAvailable(
432       void* buffer,
433       size_t size_to_read,
434       const base::Callback<void(size_t, bool)>& success_callback,
435       const ErrorCallback& error_callback,
436       AccessMode mode);
437 
438   // The internal implementation of WriteAsync() and WriteAllAsync().
439   // Calls WriteNonBlocking and if the write would block for it to not block
440   // calling WaitForData(). The extra |force_async_callback| tell whether the
441   // success callback should be called from the main loop instead of directly
442   // from this method. This method only calls WaitForData() if
443   // WriteNonBlocking() returns a situation in which it would block
444   // (size_written = 0 and eos = false), preventing us from calling
445   // WaitForData() on streams that don't support such feature.
446   BRILLO_PRIVATE bool WriteAsyncImpl(
447       const void* buffer,
448       size_t size_to_write,
449       const base::Callback<void(size_t)>& success_callback,
450       const ErrorCallback& error_callback,
451       ErrorPtr* error,
452       bool force_async_callback);
453 
454   // Called from the main loop when the WriteAsyncImpl finished right away
455   // without waiting for data. We use this callback to call the
456   // |sucess_callback| but invalidate the callback if the Stream is destroyed
457   // while this call is waiting in the main loop.
458   BRILLO_PRIVATE void OnWriteAsyncDone(
459       const base::Callback<void(size_t)>& success_callback,
460       size_t size_written);
461 
462   // Called from WaitForData() when write operations can be performed
463   // without blocking (the type of operation is provided in |mode|).
464   BRILLO_PRIVATE void OnWriteAvailable(
465       const void* buffer,
466       size_t size,
467       const base::Callback<void(size_t)>& success_callback,
468       const ErrorCallback& error_callback,
469       AccessMode mode);
470 
471   // Helper callbacks to implement ReadAllAsync/WriteAllAsync.
472   BRILLO_PRIVATE void ReadAllAsyncCallback(
473       void* buffer,
474       size_t size_to_read,
475       const base::Closure& success_callback,
476       const ErrorCallback& error_callback,
477       size_t size_read,
478       bool eos);
479   BRILLO_PRIVATE void WriteAllAsyncCallback(
480       const void* buffer,
481       size_t size_to_write,
482       const base::Closure& success_callback,
483       const ErrorCallback& error_callback,
484       size_t size_written);
485 
486   // Helper callbacks to implement FlushAsync().
487   BRILLO_PRIVATE void FlushAsyncCallback(
488       const base::Closure& success_callback,
489       const ErrorCallback& error_callback);
490 
491   // Data members for asynchronous read operations.
492   bool is_async_read_pending_{false};
493 
494   // Data members for asynchronous write operations.
495   bool is_async_write_pending_{false};
496 
497   base::WeakPtrFactory<Stream> weak_ptr_factory_{this};
498   DISALLOW_COPY_AND_ASSIGN(Stream);
499 };
500 
501 // A smart pointer to the stream used to pass the stream object around.
502 using StreamPtr = std::unique_ptr<Stream>;
503 
504 }  // namespace brillo
505 
506 #endif  // LIBBRILLO_BRILLO_STREAMS_STREAM_H_
507