1 // Copyright (C) 2019 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #pragma once
15 #include <chrono>
16 #include <condition_variable>
17 #include <cstdint>
18 #include <cstdio>
19 #include <ios>
20 #include <mutex>
21 #include <streambuf>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 
26 using std::chrono::milliseconds;
27 
28 namespace android {
29 namespace base {
30 namespace streams {
31 
32 // RingStreambuf - a thread safe streambuffer backed by a ring buffer.
33 // This thing acts as a sliding window over a stream of data.
34 //
35 // Usage examples:
36 //
37 // This creates an output stream that can write at at least 5 characters, before
38 // it start overwriting old characters.
39 //
40 //   RingStreambuf buf(5);
41 //   std::ostream stream(&buf);
42 //   stream << "Hello";
43 //
44 // Be very careful when using this as an input stream!
45 // - It can block when nothing is available, for up to timeout ms.
46 // - It will consume the stream (i.e. read pointers will move)
47 class RingStreambuf : public std::streambuf {
48 public:
49     // |capacity| the minimum number of chars that can be stored.
50     // |timeout| the max time to wait for data when using it in a stream.
51     // The real capacity will be a power of 2 above capacity.
52     // For example:
53     // A capacity of 4 allows you to store 7 characters, and takes up 2^3
54     RingStreambuf(uint32_t capacity,
55                   std::chrono::milliseconds timeout = milliseconds::max());
56 
57     // Retrieves the string stored at the given offset.
58     // It will block at most timeoutMs.
59     // Returns the available data, and the offset at which
60     // the first character was retrieved.
61     // This call will not modify any read pointers.
62     std::pair<int, std::string> bufferAtOffset(
63             std::streamsize offset,
64             milliseconds timeoutMs = milliseconds(0));
65 
66     // Blocks and waits until at least n bytes can be written, or
67     // the timeout has expired. Returns the number of bytes that
68     // can be written if no other threads have written to the
69     // buffer.
70     //
71     // Note: The return value can be less than n, in case of a timeout.
72     std::streamsize waitForAvailableSpace(std::streamsize n);
73 
74     // The total number of bytes that can be stored in the buffer.
capacity()75     size_t capacity() { return mRingbuffer.capacity() - 1; }
76 
77     // Closes this stream buffer for writing, reading can succeed until
78     // eof, timeouts will be set to 1ms.
79     void close();
80 
81 protected:
82     // Implement streambuf interface, not that writes can overwrite existing
83     // data and will report as though all bytes have been written.
84     std::streamsize xsputn(const char* s, std::streamsize n) override;
85     int overflow(int c = EOF) override;
86     std::streamsize showmanyc() override;
87 
88     // Amount of space available for writing, without erasing
89     // the previous data.
90     std::streamsize showmanyw();
91     std::streamsize xsgetn(char* s, std::streamsize n) override;
92     int underflow() override;
93     int uflow() override;
94 
95 private:
96     std::vector<char> mRingbuffer;
97 
98     uint32_t mHead{0};        // Ringbuffer write pointer (front)
99     uint32_t mTail{0};        // Ringbuffer read pointer (tail)
100     uint64_t mHeadOffset{0};  // Accumulated offset.
101     bool mFull{false};
102     bool mClosed{false};
103     std::chrono::milliseconds mTimeout;
104 
105     std::mutex mLock;
106     std::condition_variable mCanRead;
107 };
108 
109 }  // namespace streams
110 }  // namespace base
111 }  // namespace android
112