1 /*
2  *  Copyright 2019 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "rtc_base/memory/fifo_buffer.h"
12 
13 #include <algorithm>
14 
15 #include "rtc_base/thread.h"
16 
17 namespace rtc {
18 
FifoBuffer(size_t size)19 FifoBuffer::FifoBuffer(size_t size)
20     : state_(SS_OPEN),
21       buffer_(new char[size]),
22       buffer_length_(size),
23       data_length_(0),
24       read_position_(0),
25       owner_(Thread::Current()) {
26   // all events are done on the owner_ thread
27 }
28 
FifoBuffer(size_t size,Thread * owner)29 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
30     : state_(SS_OPEN),
31       buffer_(new char[size]),
32       buffer_length_(size),
33       data_length_(0),
34       read_position_(0),
35       owner_(owner) {
36   // all events are done on the owner_ thread
37 }
38 
~FifoBuffer()39 FifoBuffer::~FifoBuffer() {}
40 
GetBuffered(size_t * size) const41 bool FifoBuffer::GetBuffered(size_t* size) const {
42   webrtc::MutexLock lock(&mutex_);
43   *size = data_length_;
44   return true;
45 }
46 
SetCapacity(size_t size)47 bool FifoBuffer::SetCapacity(size_t size) {
48   webrtc::MutexLock lock(&mutex_);
49   if (data_length_ > size) {
50     return false;
51   }
52 
53   if (size != buffer_length_) {
54     char* buffer = new char[size];
55     const size_t copy = data_length_;
56     const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
57     memcpy(buffer, &buffer_[read_position_], tail_copy);
58     memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
59     buffer_.reset(buffer);
60     read_position_ = 0;
61     buffer_length_ = size;
62   }
63   return true;
64 }
65 
ReadOffset(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)66 StreamResult FifoBuffer::ReadOffset(void* buffer,
67                                     size_t bytes,
68                                     size_t offset,
69                                     size_t* bytes_read) {
70   webrtc::MutexLock lock(&mutex_);
71   return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
72 }
73 
WriteOffset(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)74 StreamResult FifoBuffer::WriteOffset(const void* buffer,
75                                      size_t bytes,
76                                      size_t offset,
77                                      size_t* bytes_written) {
78   webrtc::MutexLock lock(&mutex_);
79   return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
80 }
81 
GetState() const82 StreamState FifoBuffer::GetState() const {
83   webrtc::MutexLock lock(&mutex_);
84   return state_;
85 }
86 
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)87 StreamResult FifoBuffer::Read(void* buffer,
88                               size_t bytes,
89                               size_t* bytes_read,
90                               int* error) {
91   webrtc::MutexLock lock(&mutex_);
92   const bool was_writable = data_length_ < buffer_length_;
93   size_t copy = 0;
94   StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
95 
96   if (result == SR_SUCCESS) {
97     // If read was successful then adjust the read position and number of
98     // bytes buffered.
99     read_position_ = (read_position_ + copy) % buffer_length_;
100     data_length_ -= copy;
101     if (bytes_read) {
102       *bytes_read = copy;
103     }
104 
105     // if we were full before, and now we're not, post an event
106     if (!was_writable && copy > 0) {
107       PostEvent(owner_, SE_WRITE, 0);
108     }
109   }
110   return result;
111 }
112 
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)113 StreamResult FifoBuffer::Write(const void* buffer,
114                                size_t bytes,
115                                size_t* bytes_written,
116                                int* error) {
117   webrtc::MutexLock lock(&mutex_);
118 
119   const bool was_readable = (data_length_ > 0);
120   size_t copy = 0;
121   StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
122 
123   if (result == SR_SUCCESS) {
124     // If write was successful then adjust the number of readable bytes.
125     data_length_ += copy;
126     if (bytes_written) {
127       *bytes_written = copy;
128     }
129 
130     // if we didn't have any data to read before, and now we do, post an event
131     if (!was_readable && copy > 0) {
132       PostEvent(owner_, SE_READ, 0);
133     }
134   }
135   return result;
136 }
137 
Close()138 void FifoBuffer::Close() {
139   webrtc::MutexLock lock(&mutex_);
140   state_ = SS_CLOSED;
141 }
142 
GetReadData(size_t * size)143 const void* FifoBuffer::GetReadData(size_t* size) {
144   webrtc::MutexLock lock(&mutex_);
145   *size = (read_position_ + data_length_ <= buffer_length_)
146               ? data_length_
147               : buffer_length_ - read_position_;
148   return &buffer_[read_position_];
149 }
150 
ConsumeReadData(size_t size)151 void FifoBuffer::ConsumeReadData(size_t size) {
152   webrtc::MutexLock lock(&mutex_);
153   RTC_DCHECK(size <= data_length_);
154   const bool was_writable = data_length_ < buffer_length_;
155   read_position_ = (read_position_ + size) % buffer_length_;
156   data_length_ -= size;
157   if (!was_writable && size > 0) {
158     PostEvent(owner_, SE_WRITE, 0);
159   }
160 }
161 
GetWriteBuffer(size_t * size)162 void* FifoBuffer::GetWriteBuffer(size_t* size) {
163   webrtc::MutexLock lock(&mutex_);
164   if (state_ == SS_CLOSED) {
165     return nullptr;
166   }
167 
168   // if empty, reset the write position to the beginning, so we can get
169   // the biggest possible block
170   if (data_length_ == 0) {
171     read_position_ = 0;
172   }
173 
174   const size_t write_position =
175       (read_position_ + data_length_) % buffer_length_;
176   *size = (write_position > read_position_ || data_length_ == 0)
177               ? buffer_length_ - write_position
178               : read_position_ - write_position;
179   return &buffer_[write_position];
180 }
181 
ConsumeWriteBuffer(size_t size)182 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
183   webrtc::MutexLock lock(&mutex_);
184   RTC_DCHECK(size <= buffer_length_ - data_length_);
185   const bool was_readable = (data_length_ > 0);
186   data_length_ += size;
187   if (!was_readable && size > 0) {
188     PostEvent(owner_, SE_READ, 0);
189   }
190 }
191 
GetWriteRemaining(size_t * size) const192 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
193   webrtc::MutexLock lock(&mutex_);
194   *size = buffer_length_ - data_length_;
195   return true;
196 }
197 
ReadOffsetLocked(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)198 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
199                                           size_t bytes,
200                                           size_t offset,
201                                           size_t* bytes_read) {
202   if (offset >= data_length_) {
203     return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
204   }
205 
206   const size_t available = data_length_ - offset;
207   const size_t read_position = (read_position_ + offset) % buffer_length_;
208   const size_t copy = std::min(bytes, available);
209   const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
210   char* const p = static_cast<char*>(buffer);
211   memcpy(p, &buffer_[read_position], tail_copy);
212   memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
213 
214   if (bytes_read) {
215     *bytes_read = copy;
216   }
217   return SR_SUCCESS;
218 }
219 
WriteOffsetLocked(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)220 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
221                                            size_t bytes,
222                                            size_t offset,
223                                            size_t* bytes_written) {
224   if (state_ == SS_CLOSED) {
225     return SR_EOS;
226   }
227 
228   if (data_length_ + offset >= buffer_length_) {
229     return SR_BLOCK;
230   }
231 
232   const size_t available = buffer_length_ - data_length_ - offset;
233   const size_t write_position =
234       (read_position_ + data_length_ + offset) % buffer_length_;
235   const size_t copy = std::min(bytes, available);
236   const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
237   const char* const p = static_cast<const char*>(buffer);
238   memcpy(&buffer_[write_position], p, tail_copy);
239   memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
240 
241   if (bytes_written) {
242     *bytes_written = copy;
243   }
244   return SR_SUCCESS;
245 }
246 
247 }  // namespace rtc
248