1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #if defined(WEBRTC_POSIX)
12 #include <sys/file.h>
13 #endif  // WEBRTC_POSIX
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <errno.h>
17 
18 #include <algorithm>
19 #include <string>
20 
21 #include "webrtc/base/basictypes.h"
22 #include "webrtc/base/common.h"
23 #include "webrtc/base/logging.h"
24 #include "webrtc/base/messagequeue.h"
25 #include "webrtc/base/stream.h"
26 #include "webrtc/base/stringencode.h"
27 #include "webrtc/base/stringutils.h"
28 #include "webrtc/base/thread.h"
29 #include "webrtc/base/timeutils.h"
30 
31 #if defined(WEBRTC_WIN)
32 #include "webrtc/base/win32.h"
33 #define fileno _fileno
34 #endif
35 
36 namespace rtc {
37 
38 ///////////////////////////////////////////////////////////////////////////////
39 // StreamInterface
40 ///////////////////////////////////////////////////////////////////////////////
~StreamInterface()41 StreamInterface::~StreamInterface() {
42 }
43 
WriteAll(const void * data,size_t data_len,size_t * written,int * error)44 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
45                                        size_t* written, int* error) {
46   StreamResult result = SR_SUCCESS;
47   size_t total_written = 0, current_written;
48   while (total_written < data_len) {
49     result = Write(static_cast<const char*>(data) + total_written,
50                    data_len - total_written, &current_written, error);
51     if (result != SR_SUCCESS)
52       break;
53     total_written += current_written;
54   }
55   if (written)
56     *written = total_written;
57   return result;
58 }
59 
ReadAll(void * buffer,size_t buffer_len,size_t * read,int * error)60 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
61                                       size_t* read, int* error) {
62   StreamResult result = SR_SUCCESS;
63   size_t total_read = 0, current_read;
64   while (total_read < buffer_len) {
65     result = Read(static_cast<char*>(buffer) + total_read,
66                   buffer_len - total_read, &current_read, error);
67     if (result != SR_SUCCESS)
68       break;
69     total_read += current_read;
70   }
71   if (read)
72     *read = total_read;
73   return result;
74 }
75 
ReadLine(std::string * line)76 StreamResult StreamInterface::ReadLine(std::string* line) {
77   line->clear();
78   StreamResult result = SR_SUCCESS;
79   while (true) {
80     char ch;
81     result = Read(&ch, sizeof(ch), NULL, NULL);
82     if (result != SR_SUCCESS) {
83       break;
84     }
85     if (ch == '\n') {
86       break;
87     }
88     line->push_back(ch);
89   }
90   if (!line->empty()) {   // give back the line we've collected so far with
91     result = SR_SUCCESS;  // a success code.  Otherwise return the last code
92   }
93   return result;
94 }
95 
PostEvent(Thread * t,int events,int err)96 void StreamInterface::PostEvent(Thread* t, int events, int err) {
97   t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
98 }
99 
PostEvent(int events,int err)100 void StreamInterface::PostEvent(int events, int err) {
101   PostEvent(Thread::Current(), events, err);
102 }
103 
GetReadData(size_t * data_len)104 const void* StreamInterface::GetReadData(size_t* data_len) {
105   return NULL;
106 }
107 
GetWriteBuffer(size_t * buf_len)108 void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
109   return NULL;
110 }
111 
SetPosition(size_t position)112 bool StreamInterface::SetPosition(size_t position) {
113   return false;
114 }
115 
GetPosition(size_t * position) const116 bool StreamInterface::GetPosition(size_t* position) const {
117   return false;
118 }
119 
GetSize(size_t * size) const120 bool StreamInterface::GetSize(size_t* size) const {
121   return false;
122 }
123 
GetAvailable(size_t * size) const124 bool StreamInterface::GetAvailable(size_t* size) const {
125   return false;
126 }
127 
GetWriteRemaining(size_t * size) const128 bool StreamInterface::GetWriteRemaining(size_t* size) const {
129   return false;
130 }
131 
Flush()132 bool StreamInterface::Flush() {
133   return false;
134 }
135 
ReserveSize(size_t size)136 bool StreamInterface::ReserveSize(size_t size) {
137   return true;
138 }
139 
StreamInterface()140 StreamInterface::StreamInterface() {
141 }
142 
OnMessage(Message * msg)143 void StreamInterface::OnMessage(Message* msg) {
144   if (MSG_POST_EVENT == msg->message_id) {
145     StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
146     SignalEvent(this, pe->events, pe->error);
147     delete msg->pdata;
148   }
149 }
150 
151 ///////////////////////////////////////////////////////////////////////////////
152 // StreamAdapterInterface
153 ///////////////////////////////////////////////////////////////////////////////
154 
StreamAdapterInterface(StreamInterface * stream,bool owned)155 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
156                                                bool owned)
157     : stream_(stream), owned_(owned) {
158   if (NULL != stream_)
159     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
160 }
161 
GetState() const162 StreamState StreamAdapterInterface::GetState() const {
163   return stream_->GetState();
164 }
Read(void * buffer,size_t buffer_len,size_t * read,int * error)165 StreamResult StreamAdapterInterface::Read(void* buffer,
166                                           size_t buffer_len,
167                                           size_t* read,
168                                           int* error) {
169   return stream_->Read(buffer, buffer_len, read, error);
170 }
Write(const void * data,size_t data_len,size_t * written,int * error)171 StreamResult StreamAdapterInterface::Write(const void* data,
172                                            size_t data_len,
173                                            size_t* written,
174                                            int* error) {
175   return stream_->Write(data, data_len, written, error);
176 }
Close()177 void StreamAdapterInterface::Close() {
178   stream_->Close();
179 }
180 
SetPosition(size_t position)181 bool StreamAdapterInterface::SetPosition(size_t position) {
182   return stream_->SetPosition(position);
183 }
184 
GetPosition(size_t * position) const185 bool StreamAdapterInterface::GetPosition(size_t* position) const {
186   return stream_->GetPosition(position);
187 }
188 
GetSize(size_t * size) const189 bool StreamAdapterInterface::GetSize(size_t* size) const {
190   return stream_->GetSize(size);
191 }
192 
GetAvailable(size_t * size) const193 bool StreamAdapterInterface::GetAvailable(size_t* size) const {
194   return stream_->GetAvailable(size);
195 }
196 
GetWriteRemaining(size_t * size) const197 bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
198   return stream_->GetWriteRemaining(size);
199 }
200 
ReserveSize(size_t size)201 bool StreamAdapterInterface::ReserveSize(size_t size) {
202   return stream_->ReserveSize(size);
203 }
204 
Flush()205 bool StreamAdapterInterface::Flush() {
206   return stream_->Flush();
207 }
208 
Attach(StreamInterface * stream,bool owned)209 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
210   if (NULL != stream_)
211     stream_->SignalEvent.disconnect(this);
212   if (owned_)
213     delete stream_;
214   stream_ = stream;
215   owned_ = owned;
216   if (NULL != stream_)
217     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
218 }
219 
Detach()220 StreamInterface* StreamAdapterInterface::Detach() {
221   if (NULL != stream_)
222     stream_->SignalEvent.disconnect(this);
223   StreamInterface* stream = stream_;
224   stream_ = NULL;
225   return stream;
226 }
227 
~StreamAdapterInterface()228 StreamAdapterInterface::~StreamAdapterInterface() {
229   if (owned_)
230     delete stream_;
231 }
232 
OnEvent(StreamInterface * stream,int events,int err)233 void StreamAdapterInterface::OnEvent(StreamInterface* stream,
234                                      int events,
235                                      int err) {
236   SignalEvent(this, events, err);
237 }
238 
239 ///////////////////////////////////////////////////////////////////////////////
240 // StreamTap
241 ///////////////////////////////////////////////////////////////////////////////
242 
StreamTap(StreamInterface * stream,StreamInterface * tap)243 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
244     : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
245         tap_error_(0) {
246   AttachTap(tap);
247 }
248 
249 StreamTap::~StreamTap() = default;
250 
AttachTap(StreamInterface * tap)251 void StreamTap::AttachTap(StreamInterface* tap) {
252   tap_.reset(tap);
253 }
254 
DetachTap()255 StreamInterface* StreamTap::DetachTap() {
256   return tap_.release();
257 }
258 
GetTapResult(int * error)259 StreamResult StreamTap::GetTapResult(int* error) {
260   if (error) {
261     *error = tap_error_;
262   }
263   return tap_result_;
264 }
265 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)266 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
267                              size_t* read, int* error) {
268   size_t backup_read;
269   if (!read) {
270     read = &backup_read;
271   }
272   StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
273                                                   read, error);
274   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
275     tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
276   }
277   return res;
278 }
279 
Write(const void * data,size_t data_len,size_t * written,int * error)280 StreamResult StreamTap::Write(const void* data, size_t data_len,
281                               size_t* written, int* error) {
282   size_t backup_written;
283   if (!written) {
284     written = &backup_written;
285   }
286   StreamResult res = StreamAdapterInterface::Write(data, data_len,
287                                                    written, error);
288   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
289     tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
290   }
291   return res;
292 }
293 
294 ///////////////////////////////////////////////////////////////////////////////
295 // NullStream
296 ///////////////////////////////////////////////////////////////////////////////
297 
NullStream()298 NullStream::NullStream() {
299 }
300 
~NullStream()301 NullStream::~NullStream() {
302 }
303 
GetState() const304 StreamState NullStream::GetState() const {
305   return SS_OPEN;
306 }
307 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)308 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
309                               size_t* read, int* error) {
310   if (error) *error = -1;
311   return SR_ERROR;
312 }
313 
Write(const void * data,size_t data_len,size_t * written,int * error)314 StreamResult NullStream::Write(const void* data, size_t data_len,
315                                size_t* written, int* error) {
316   if (written) *written = data_len;
317   return SR_SUCCESS;
318 }
319 
Close()320 void NullStream::Close() {
321 }
322 
323 ///////////////////////////////////////////////////////////////////////////////
324 // FileStream
325 ///////////////////////////////////////////////////////////////////////////////
326 
FileStream()327 FileStream::FileStream() : file_(NULL) {
328 }
329 
~FileStream()330 FileStream::~FileStream() {
331   FileStream::Close();
332 }
333 
Open(const std::string & filename,const char * mode,int * error)334 bool FileStream::Open(const std::string& filename, const char* mode,
335                       int* error) {
336   Close();
337 #if defined(WEBRTC_WIN)
338   std::wstring wfilename;
339   if (Utf8ToWindowsFilename(filename, &wfilename)) {
340     file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
341   } else {
342     if (error) {
343       *error = -1;
344       return false;
345     }
346   }
347 #else
348   file_ = fopen(filename.c_str(), mode);
349 #endif
350   if (!file_ && error) {
351     *error = errno;
352   }
353   return (file_ != NULL);
354 }
355 
OpenShare(const std::string & filename,const char * mode,int shflag,int * error)356 bool FileStream::OpenShare(const std::string& filename, const char* mode,
357                            int shflag, int* error) {
358   Close();
359 #if defined(WEBRTC_WIN)
360   std::wstring wfilename;
361   if (Utf8ToWindowsFilename(filename, &wfilename)) {
362     file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
363     if (!file_ && error) {
364       *error = errno;
365       return false;
366     }
367     return file_ != NULL;
368   } else {
369     if (error) {
370       *error = -1;
371     }
372     return false;
373   }
374 #else
375   return Open(filename, mode, error);
376 #endif
377 }
378 
DisableBuffering()379 bool FileStream::DisableBuffering() {
380   if (!file_)
381     return false;
382   return (setvbuf(file_, NULL, _IONBF, 0) == 0);
383 }
384 
GetState() const385 StreamState FileStream::GetState() const {
386   return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
387 }
388 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)389 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
390                               size_t* read, int* error) {
391   if (!file_)
392     return SR_EOS;
393   size_t result = fread(buffer, 1, buffer_len, file_);
394   if ((result == 0) && (buffer_len > 0)) {
395     if (feof(file_))
396       return SR_EOS;
397     if (error)
398       *error = errno;
399     return SR_ERROR;
400   }
401   if (read)
402     *read = result;
403   return SR_SUCCESS;
404 }
405 
Write(const void * data,size_t data_len,size_t * written,int * error)406 StreamResult FileStream::Write(const void* data, size_t data_len,
407                                size_t* written, int* error) {
408   if (!file_)
409     return SR_EOS;
410   size_t result = fwrite(data, 1, data_len, file_);
411   if ((result == 0) && (data_len > 0)) {
412     if (error)
413       *error = errno;
414     return SR_ERROR;
415   }
416   if (written)
417     *written = result;
418   return SR_SUCCESS;
419 }
420 
Close()421 void FileStream::Close() {
422   if (file_) {
423     DoClose();
424     file_ = NULL;
425   }
426 }
427 
SetPosition(size_t position)428 bool FileStream::SetPosition(size_t position) {
429   if (!file_)
430     return false;
431   return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
432 }
433 
GetPosition(size_t * position) const434 bool FileStream::GetPosition(size_t* position) const {
435   ASSERT(NULL != position);
436   if (!file_)
437     return false;
438   long result = ftell(file_);
439   if (result < 0)
440     return false;
441   if (position)
442     *position = result;
443   return true;
444 }
445 
GetSize(size_t * size) const446 bool FileStream::GetSize(size_t* size) const {
447   ASSERT(NULL != size);
448   if (!file_)
449     return false;
450   struct stat file_stats;
451   if (fstat(fileno(file_), &file_stats) != 0)
452     return false;
453   if (size)
454     *size = file_stats.st_size;
455   return true;
456 }
457 
GetAvailable(size_t * size) const458 bool FileStream::GetAvailable(size_t* size) const {
459   ASSERT(NULL != size);
460   if (!GetSize(size))
461     return false;
462   long result = ftell(file_);
463   if (result < 0)
464     return false;
465   if (size)
466     *size -= result;
467   return true;
468 }
469 
ReserveSize(size_t size)470 bool FileStream::ReserveSize(size_t size) {
471   // TODO: extend the file to the proper length
472   return true;
473 }
474 
GetSize(const std::string & filename,size_t * size)475 bool FileStream::GetSize(const std::string& filename, size_t* size) {
476   struct stat file_stats;
477   if (stat(filename.c_str(), &file_stats) != 0)
478     return false;
479   *size = file_stats.st_size;
480   return true;
481 }
482 
Flush()483 bool FileStream::Flush() {
484   if (file_) {
485     return (0 == fflush(file_));
486   }
487   // try to flush empty file?
488   ASSERT(false);
489   return false;
490 }
491 
492 #if defined(WEBRTC_POSIX) && !defined(__native_client__)
493 
TryLock()494 bool FileStream::TryLock() {
495   if (file_ == NULL) {
496     // Stream not open.
497     ASSERT(false);
498     return false;
499   }
500 
501   return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
502 }
503 
Unlock()504 bool FileStream::Unlock() {
505   if (file_ == NULL) {
506     // Stream not open.
507     ASSERT(false);
508     return false;
509   }
510 
511   return flock(fileno(file_), LOCK_UN) == 0;
512 }
513 
514 #endif
515 
DoClose()516 void FileStream::DoClose() {
517   fclose(file_);
518 }
519 
520 ///////////////////////////////////////////////////////////////////////////////
521 // MemoryStream
522 ///////////////////////////////////////////////////////////////////////////////
523 
MemoryStreamBase()524 MemoryStreamBase::MemoryStreamBase()
525   : buffer_(NULL), buffer_length_(0), data_length_(0),
526     seek_position_(0) {
527 }
528 
GetState() const529 StreamState MemoryStreamBase::GetState() const {
530   return SS_OPEN;
531 }
532 
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)533 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
534                                     size_t* bytes_read, int* error) {
535   if (seek_position_ >= data_length_) {
536     return SR_EOS;
537   }
538   size_t available = data_length_ - seek_position_;
539   if (bytes > available) {
540     // Read partial buffer
541     bytes = available;
542   }
543   memcpy(buffer, &buffer_[seek_position_], bytes);
544   seek_position_ += bytes;
545   if (bytes_read) {
546     *bytes_read = bytes;
547   }
548   return SR_SUCCESS;
549 }
550 
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)551 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
552                                      size_t* bytes_written, int* error) {
553   size_t available = buffer_length_ - seek_position_;
554   if (0 == available) {
555     // Increase buffer size to the larger of:
556     // a) new position rounded up to next 256 bytes
557     // b) double the previous length
558     size_t new_buffer_length =
559         std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
560     StreamResult result = DoReserve(new_buffer_length, error);
561     if (SR_SUCCESS != result) {
562       return result;
563     }
564     ASSERT(buffer_length_ >= new_buffer_length);
565     available = buffer_length_ - seek_position_;
566   }
567 
568   if (bytes > available) {
569     bytes = available;
570   }
571   memcpy(&buffer_[seek_position_], buffer, bytes);
572   seek_position_ += bytes;
573   if (data_length_ < seek_position_) {
574     data_length_ = seek_position_;
575   }
576   if (bytes_written) {
577     *bytes_written = bytes;
578   }
579   return SR_SUCCESS;
580 }
581 
Close()582 void MemoryStreamBase::Close() {
583   // nothing to do
584 }
585 
SetPosition(size_t position)586 bool MemoryStreamBase::SetPosition(size_t position) {
587   if (position > data_length_)
588     return false;
589   seek_position_ = position;
590   return true;
591 }
592 
GetPosition(size_t * position) const593 bool MemoryStreamBase::GetPosition(size_t* position) const {
594   if (position)
595     *position = seek_position_;
596   return true;
597 }
598 
GetSize(size_t * size) const599 bool MemoryStreamBase::GetSize(size_t* size) const {
600   if (size)
601     *size = data_length_;
602   return true;
603 }
604 
GetAvailable(size_t * size) const605 bool MemoryStreamBase::GetAvailable(size_t* size) const {
606   if (size)
607     *size = data_length_ - seek_position_;
608   return true;
609 }
610 
ReserveSize(size_t size)611 bool MemoryStreamBase::ReserveSize(size_t size) {
612   return (SR_SUCCESS == DoReserve(size, NULL));
613 }
614 
DoReserve(size_t size,int * error)615 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
616   return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
617 }
618 
619 ///////////////////////////////////////////////////////////////////////////////
620 
MemoryStream()621 MemoryStream::MemoryStream()
622   : buffer_alloc_(NULL) {
623 }
624 
MemoryStream(const char * data)625 MemoryStream::MemoryStream(const char* data)
626   : buffer_alloc_(NULL) {
627   SetData(data, strlen(data));
628 }
629 
MemoryStream(const void * data,size_t length)630 MemoryStream::MemoryStream(const void* data, size_t length)
631   : buffer_alloc_(NULL) {
632   SetData(data, length);
633 }
634 
~MemoryStream()635 MemoryStream::~MemoryStream() {
636   delete [] buffer_alloc_;
637 }
638 
SetData(const void * data,size_t length)639 void MemoryStream::SetData(const void* data, size_t length) {
640   data_length_ = buffer_length_ = length;
641   delete [] buffer_alloc_;
642   buffer_alloc_ = new char[buffer_length_ + kAlignment];
643   buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
644   memcpy(buffer_, data, data_length_);
645   seek_position_ = 0;
646 }
647 
DoReserve(size_t size,int * error)648 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
649   if (buffer_length_ >= size)
650     return SR_SUCCESS;
651 
652   if (char* new_buffer_alloc = new char[size + kAlignment]) {
653     char* new_buffer = reinterpret_cast<char*>(
654         ALIGNP(new_buffer_alloc, kAlignment));
655     memcpy(new_buffer, buffer_, data_length_);
656     delete [] buffer_alloc_;
657     buffer_alloc_ = new_buffer_alloc;
658     buffer_ = new_buffer;
659     buffer_length_ = size;
660     return SR_SUCCESS;
661   }
662 
663   if (error) {
664     *error = ENOMEM;
665   }
666   return SR_ERROR;
667 }
668 
669 ///////////////////////////////////////////////////////////////////////////////
670 
ExternalMemoryStream()671 ExternalMemoryStream::ExternalMemoryStream() {
672 }
673 
ExternalMemoryStream(void * data,size_t length)674 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
675   SetData(data, length);
676 }
677 
~ExternalMemoryStream()678 ExternalMemoryStream::~ExternalMemoryStream() {
679 }
680 
SetData(void * data,size_t length)681 void ExternalMemoryStream::SetData(void* data, size_t length) {
682   data_length_ = buffer_length_ = length;
683   buffer_ = static_cast<char*>(data);
684   seek_position_ = 0;
685 }
686 
687 ///////////////////////////////////////////////////////////////////////////////
688 // FifoBuffer
689 ///////////////////////////////////////////////////////////////////////////////
690 
FifoBuffer(size_t size)691 FifoBuffer::FifoBuffer(size_t size)
692     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
693       data_length_(0), read_position_(0), owner_(Thread::Current()) {
694   // all events are done on the owner_ thread
695 }
696 
FifoBuffer(size_t size,Thread * owner)697 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
698     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
699       data_length_(0), read_position_(0), owner_(owner) {
700   // all events are done on the owner_ thread
701 }
702 
~FifoBuffer()703 FifoBuffer::~FifoBuffer() {
704 }
705 
GetBuffered(size_t * size) const706 bool FifoBuffer::GetBuffered(size_t* size) const {
707   CritScope cs(&crit_);
708   *size = data_length_;
709   return true;
710 }
711 
SetCapacity(size_t size)712 bool FifoBuffer::SetCapacity(size_t size) {
713   CritScope cs(&crit_);
714   if (data_length_ > size) {
715     return false;
716   }
717 
718   if (size != buffer_length_) {
719     char* buffer = new char[size];
720     const size_t copy = data_length_;
721     const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
722     memcpy(buffer, &buffer_[read_position_], tail_copy);
723     memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
724     buffer_.reset(buffer);
725     read_position_ = 0;
726     buffer_length_ = size;
727   }
728   return true;
729 }
730 
ReadOffset(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)731 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
732                                     size_t offset, size_t* bytes_read) {
733   CritScope cs(&crit_);
734   return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
735 }
736 
WriteOffset(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)737 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
738                                      size_t offset, size_t* bytes_written) {
739   CritScope cs(&crit_);
740   return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
741 }
742 
GetState() const743 StreamState FifoBuffer::GetState() const {
744   return state_;
745 }
746 
Read(void * buffer,size_t bytes,size_t * bytes_read,int * error)747 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
748                               size_t* bytes_read, int* error) {
749   CritScope cs(&crit_);
750   const bool was_writable = data_length_ < buffer_length_;
751   size_t copy = 0;
752   StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
753 
754   if (result == SR_SUCCESS) {
755     // If read was successful then adjust the read position and number of
756     // bytes buffered.
757     read_position_ = (read_position_ + copy) % buffer_length_;
758     data_length_ -= copy;
759     if (bytes_read) {
760       *bytes_read = copy;
761     }
762 
763     // if we were full before, and now we're not, post an event
764     if (!was_writable && copy > 0) {
765       PostEvent(owner_, SE_WRITE, 0);
766     }
767   }
768   return result;
769 }
770 
Write(const void * buffer,size_t bytes,size_t * bytes_written,int * error)771 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
772                                size_t* bytes_written, int* error) {
773   CritScope cs(&crit_);
774 
775   const bool was_readable = (data_length_ > 0);
776   size_t copy = 0;
777   StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
778 
779   if (result == SR_SUCCESS) {
780     // If write was successful then adjust the number of readable bytes.
781     data_length_ += copy;
782     if (bytes_written) {
783       *bytes_written = copy;
784     }
785 
786     // if we didn't have any data to read before, and now we do, post an event
787     if (!was_readable && copy > 0) {
788       PostEvent(owner_, SE_READ, 0);
789     }
790   }
791   return result;
792 }
793 
Close()794 void FifoBuffer::Close() {
795   CritScope cs(&crit_);
796   state_ = SS_CLOSED;
797 }
798 
GetReadData(size_t * size)799 const void* FifoBuffer::GetReadData(size_t* size) {
800   CritScope cs(&crit_);
801   *size = (read_position_ + data_length_ <= buffer_length_) ?
802       data_length_ : buffer_length_ - read_position_;
803   return &buffer_[read_position_];
804 }
805 
ConsumeReadData(size_t size)806 void FifoBuffer::ConsumeReadData(size_t size) {
807   CritScope cs(&crit_);
808   ASSERT(size <= data_length_);
809   const bool was_writable = data_length_ < buffer_length_;
810   read_position_ = (read_position_ + size) % buffer_length_;
811   data_length_ -= size;
812   if (!was_writable && size > 0) {
813     PostEvent(owner_, SE_WRITE, 0);
814   }
815 }
816 
GetWriteBuffer(size_t * size)817 void* FifoBuffer::GetWriteBuffer(size_t* size) {
818   CritScope cs(&crit_);
819   if (state_ == SS_CLOSED) {
820     return NULL;
821   }
822 
823   // if empty, reset the write position to the beginning, so we can get
824   // the biggest possible block
825   if (data_length_ == 0) {
826     read_position_ = 0;
827   }
828 
829   const size_t write_position = (read_position_ + data_length_)
830       % buffer_length_;
831   *size = (write_position > read_position_ || data_length_ == 0) ?
832       buffer_length_ - write_position : read_position_ - write_position;
833   return &buffer_[write_position];
834 }
835 
ConsumeWriteBuffer(size_t size)836 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
837   CritScope cs(&crit_);
838   ASSERT(size <= buffer_length_ - data_length_);
839   const bool was_readable = (data_length_ > 0);
840   data_length_ += size;
841   if (!was_readable && size > 0) {
842     PostEvent(owner_, SE_READ, 0);
843   }
844 }
845 
GetWriteRemaining(size_t * size) const846 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
847   CritScope cs(&crit_);
848   *size = buffer_length_ - data_length_;
849   return true;
850 }
851 
ReadOffsetLocked(void * buffer,size_t bytes,size_t offset,size_t * bytes_read)852 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
853                                           size_t bytes,
854                                           size_t offset,
855                                           size_t* bytes_read) {
856   if (offset >= data_length_) {
857     return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
858   }
859 
860   const size_t available = data_length_ - offset;
861   const size_t read_position = (read_position_ + offset) % buffer_length_;
862   const size_t copy = std::min(bytes, available);
863   const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
864   char* const p = static_cast<char*>(buffer);
865   memcpy(p, &buffer_[read_position], tail_copy);
866   memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
867 
868   if (bytes_read) {
869     *bytes_read = copy;
870   }
871   return SR_SUCCESS;
872 }
873 
WriteOffsetLocked(const void * buffer,size_t bytes,size_t offset,size_t * bytes_written)874 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
875                                            size_t bytes,
876                                            size_t offset,
877                                            size_t* bytes_written) {
878   if (state_ == SS_CLOSED) {
879     return SR_EOS;
880   }
881 
882   if (data_length_ + offset >= buffer_length_) {
883     return SR_BLOCK;
884   }
885 
886   const size_t available = buffer_length_ - data_length_ - offset;
887   const size_t write_position = (read_position_ + data_length_ + offset)
888       % buffer_length_;
889   const size_t copy = std::min(bytes, available);
890   const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
891   const char* const p = static_cast<const char*>(buffer);
892   memcpy(&buffer_[write_position], p, tail_copy);
893   memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
894 
895   if (bytes_written) {
896     *bytes_written = copy;
897   }
898   return SR_SUCCESS;
899 }
900 
901 
902 
903 ///////////////////////////////////////////////////////////////////////////////
904 // LoggingAdapter
905 ///////////////////////////////////////////////////////////////////////////////
906 
LoggingAdapter(StreamInterface * stream,LoggingSeverity level,const std::string & label,bool hex_mode)907 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
908                                const std::string& label, bool hex_mode)
909     : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
910   set_label(label);
911 }
912 
set_label(const std::string & label)913 void LoggingAdapter::set_label(const std::string& label) {
914   label_.assign("[");
915   label_.append(label);
916   label_.append("]");
917 }
918 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)919 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
920                                   size_t* read, int* error) {
921   size_t local_read; if (!read) read = &local_read;
922   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
923                                                      error);
924   if (result == SR_SUCCESS) {
925     LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
926   }
927   return result;
928 }
929 
Write(const void * data,size_t data_len,size_t * written,int * error)930 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
931                                    size_t* written, int* error) {
932   size_t local_written;
933   if (!written) written = &local_written;
934   StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
935                                                       error);
936   if (result == SR_SUCCESS) {
937     LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
938                  &lms_);
939   }
940   return result;
941 }
942 
Close()943 void LoggingAdapter::Close() {
944   LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
945   LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
946   LOG_V(level_) << label_ << " Closed locally";
947   StreamAdapterInterface::Close();
948 }
949 
OnEvent(StreamInterface * stream,int events,int err)950 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
951   if (events & SE_OPEN) {
952     LOG_V(level_) << label_ << " Open";
953   } else if (events & SE_CLOSE) {
954     LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
955     LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
956     LOG_V(level_) << label_ << " Closed with error: " << err;
957   }
958   StreamAdapterInterface::OnEvent(stream, events, err);
959 }
960 
961 ///////////////////////////////////////////////////////////////////////////////
962 // StringStream - Reads/Writes to an external std::string
963 ///////////////////////////////////////////////////////////////////////////////
964 
StringStream(std::string * str)965 StringStream::StringStream(std::string* str)
966     : str_(*str), read_pos_(0), read_only_(false) {
967 }
968 
StringStream(const std::string & str)969 StringStream::StringStream(const std::string& str)
970     : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
971 }
972 
GetState() const973 StreamState StringStream::GetState() const {
974   return SS_OPEN;
975 }
976 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)977 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
978                                       size_t* read, int* error) {
979   size_t available = std::min(buffer_len, str_.size() - read_pos_);
980   if (!available)
981     return SR_EOS;
982   memcpy(buffer, str_.data() + read_pos_, available);
983   read_pos_ += available;
984   if (read)
985     *read = available;
986   return SR_SUCCESS;
987 }
988 
Write(const void * data,size_t data_len,size_t * written,int * error)989 StreamResult StringStream::Write(const void* data, size_t data_len,
990                                       size_t* written, int* error) {
991   if (read_only_) {
992     if (error) {
993       *error = -1;
994     }
995     return SR_ERROR;
996   }
997   str_.append(static_cast<const char*>(data),
998               static_cast<const char*>(data) + data_len);
999   if (written)
1000     *written = data_len;
1001   return SR_SUCCESS;
1002 }
1003 
Close()1004 void StringStream::Close() {
1005 }
1006 
SetPosition(size_t position)1007 bool StringStream::SetPosition(size_t position) {
1008   if (position > str_.size())
1009     return false;
1010   read_pos_ = position;
1011   return true;
1012 }
1013 
GetPosition(size_t * position) const1014 bool StringStream::GetPosition(size_t* position) const {
1015   if (position)
1016     *position = read_pos_;
1017   return true;
1018 }
1019 
GetSize(size_t * size) const1020 bool StringStream::GetSize(size_t* size) const {
1021   if (size)
1022     *size = str_.size();
1023   return true;
1024 }
1025 
GetAvailable(size_t * size) const1026 bool StringStream::GetAvailable(size_t* size) const {
1027   if (size)
1028     *size = str_.size() - read_pos_;
1029   return true;
1030 }
1031 
ReserveSize(size_t size)1032 bool StringStream::ReserveSize(size_t size) {
1033   if (read_only_)
1034     return false;
1035   str_.reserve(size);
1036   return true;
1037 }
1038 
1039 ///////////////////////////////////////////////////////////////////////////////
1040 // StreamReference
1041 ///////////////////////////////////////////////////////////////////////////////
1042 
StreamReference(StreamInterface * stream)1043 StreamReference::StreamReference(StreamInterface* stream)
1044     : StreamAdapterInterface(stream, false) {
1045   // owner set to false so the destructor does not free the stream.
1046   stream_ref_count_ = new StreamRefCount(stream);
1047 }
1048 
NewReference()1049 StreamInterface* StreamReference::NewReference() {
1050   stream_ref_count_->AddReference();
1051   return new StreamReference(stream_ref_count_, stream());
1052 }
1053 
~StreamReference()1054 StreamReference::~StreamReference() {
1055   stream_ref_count_->Release();
1056 }
1057 
StreamReference(StreamRefCount * stream_ref_count,StreamInterface * stream)1058 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1059                                  StreamInterface* stream)
1060     : StreamAdapterInterface(stream, false),
1061       stream_ref_count_(stream_ref_count) {
1062 }
1063 
1064 ///////////////////////////////////////////////////////////////////////////////
1065 
Flow(StreamInterface * source,char * buffer,size_t buffer_len,StreamInterface * sink,size_t * data_len)1066 StreamResult Flow(StreamInterface* source,
1067                   char* buffer, size_t buffer_len,
1068                   StreamInterface* sink,
1069                   size_t* data_len /* = NULL */) {
1070   ASSERT(buffer_len > 0);
1071 
1072   StreamResult result;
1073   size_t count, read_pos, write_pos;
1074   if (data_len) {
1075     read_pos = *data_len;
1076   } else {
1077     read_pos = 0;
1078   }
1079 
1080   bool end_of_stream = false;
1081   do {
1082     // Read until buffer is full, end of stream, or error
1083     while (!end_of_stream && (read_pos < buffer_len)) {
1084       result = source->Read(buffer + read_pos, buffer_len - read_pos,
1085                             &count, NULL);
1086       if (result == SR_EOS) {
1087         end_of_stream = true;
1088       } else if (result != SR_SUCCESS) {
1089         if (data_len) {
1090           *data_len = read_pos;
1091         }
1092         return result;
1093       } else {
1094         read_pos += count;
1095       }
1096     }
1097 
1098     // Write until buffer is empty, or error (including end of stream)
1099     write_pos = 0;
1100     while (write_pos < read_pos) {
1101       result = sink->Write(buffer + write_pos, read_pos - write_pos,
1102                            &count, NULL);
1103       if (result != SR_SUCCESS) {
1104         if (data_len) {
1105           *data_len = read_pos - write_pos;
1106           if (write_pos > 0) {
1107             memmove(buffer, buffer + write_pos, *data_len);
1108           }
1109         }
1110         return result;
1111       }
1112       write_pos += count;
1113     }
1114 
1115     read_pos = 0;
1116   } while (!end_of_stream);
1117 
1118   if (data_len) {
1119     *data_len = 0;
1120   }
1121   return SR_SUCCESS;
1122 }
1123 
1124 ///////////////////////////////////////////////////////////////////////////////
1125 
1126 }  // namespace rtc
1127