1 // Copyright (C) 2019 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #pragma once 15 #include <chrono> 16 #include <condition_variable> 17 #include <cstdint> 18 #include <cstdio> 19 #include <ios> 20 #include <mutex> 21 #include <streambuf> 22 #include <string> 23 #include <utility> 24 #include <vector> 25 26 using std::chrono::milliseconds; 27 28 namespace android { 29 namespace base { 30 namespace streams { 31 32 // RingStreambuf - a thread safe streambuffer backed by a ring buffer. 33 // This thing acts as a sliding window over a stream of data. 34 // 35 // Usage examples: 36 // 37 // This creates an output stream that can write at at least 5 characters, before 38 // it start overwriting old characters. 39 // 40 // RingStreambuf buf(5); 41 // std::ostream stream(&buf); 42 // stream << "Hello"; 43 // 44 // Be very careful when using this as an input stream! 45 // - It can block when nothing is available, for up to timeout ms. 46 // - It will consume the stream (i.e. read pointers will move) 47 class RingStreambuf : public std::streambuf { 48 public: 49 // |capacity| the minimum number of chars that can be stored. 50 // |timeout| the max time to wait for data when using it in a stream. 51 // The real capacity will be a power of 2 above capacity. 52 // For example: 53 // A capacity of 4 allows you to store 7 characters, and takes up 2^3 54 RingStreambuf(uint32_t capacity, 55 std::chrono::milliseconds timeout = milliseconds::max()); 56 57 // Retrieves the string stored at the given offset. 58 // It will block at most timeoutMs. 59 // Returns the available data, and the offset at which 60 // the first character was retrieved. 61 // This call will not modify any read pointers. 62 std::pair<int, std::string> bufferAtOffset( 63 std::streamsize offset, 64 milliseconds timeoutMs = milliseconds(0)); 65 66 // Blocks and waits until at least n bytes can be written, or 67 // the timeout has expired. Returns the number of bytes that 68 // can be written if no other threads have written to the 69 // buffer. 70 // 71 // Note: The return value can be less than n, in case of a timeout. 72 std::streamsize waitForAvailableSpace(std::streamsize n); 73 74 // The total number of bytes that can be stored in the buffer. capacity()75 size_t capacity() { return mRingbuffer.capacity() - 1; } 76 77 // Closes this stream buffer for writing, reading can succeed until 78 // eof, timeouts will be set to 1ms. 79 void close(); 80 81 protected: 82 // Implement streambuf interface, not that writes can overwrite existing 83 // data and will report as though all bytes have been written. 84 std::streamsize xsputn(const char* s, std::streamsize n) override; 85 int overflow(int c = EOF) override; 86 std::streamsize showmanyc() override; 87 88 // Amount of space available for writing, without erasing 89 // the previous data. 90 std::streamsize showmanyw(); 91 std::streamsize xsgetn(char* s, std::streamsize n) override; 92 int underflow() override; 93 int uflow() override; 94 95 private: 96 std::vector<char> mRingbuffer; 97 98 uint32_t mHead{0}; // Ringbuffer write pointer (front) 99 uint32_t mTail{0}; // Ringbuffer read pointer (tail) 100 uint64_t mHeadOffset{0}; // Accumulated offset. 101 bool mFull{false}; 102 bool mClosed{false}; 103 std::chrono::milliseconds mTimeout; 104 105 std::mutex mLock; 106 std::condition_variable mCanRead; 107 }; 108 109 } // namespace streams 110 } // namespace base 111 } // namespace android 112