1 // Copyright 2015 The Weave Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "src/streams.h"
6 
7 #include <base/bind.h>
8 #include <base/callback.h>
9 #include <weave/provider/task_runner.h>
10 #include <weave/stream.h>
11 
12 namespace weave {
13 
14 namespace {}  // namespace
15 
MemoryStream(const std::vector<uint8_t> & data,provider::TaskRunner * task_runner)16 MemoryStream::MemoryStream(const std::vector<uint8_t>& data,
17                            provider::TaskRunner* task_runner)
18     : data_{data}, task_runner_{task_runner} {}
19 
Read(void * buffer,size_t size_to_read,const ReadCallback & callback)20 void MemoryStream::Read(void* buffer,
21                         size_t size_to_read,
22                         const ReadCallback& callback) {
23   CHECK_LE(read_position_, data_.size());
24   size_t size_read = std::min(size_to_read, data_.size() - read_position_);
25   if (size_read > 0)
26     memcpy(buffer, data_.data() + read_position_, size_read);
27   read_position_ += size_read;
28   task_runner_->PostDelayedTask(FROM_HERE,
29                                 base::Bind(callback, size_read, nullptr), {});
30 }
31 
Write(const void * buffer,size_t size_to_write,const WriteCallback & callback)32 void MemoryStream::Write(const void* buffer,
33                          size_t size_to_write,
34                          const WriteCallback& callback) {
35   data_.insert(data_.end(), static_cast<const char*>(buffer),
36                static_cast<const char*>(buffer) + size_to_write);
37   task_runner_->PostDelayedTask(FROM_HERE, base::Bind(callback, nullptr), {});
38 }
39 
StreamCopier(InputStream * source,OutputStream * destination)40 StreamCopier::StreamCopier(InputStream* source, OutputStream* destination)
41     : source_{source}, destination_{destination}, buffer_(4096) {}
42 
Copy(const InputStream::ReadCallback & callback)43 void StreamCopier::Copy(const InputStream::ReadCallback& callback) {
44   source_->Read(buffer_.data(), buffer_.size(),
45                 base::Bind(&StreamCopier::OnReadDone,
46                            weak_ptr_factory_.GetWeakPtr(), callback));
47 }
48 
OnReadDone(const InputStream::ReadCallback & callback,size_t size,ErrorPtr error)49 void StreamCopier::OnReadDone(const InputStream::ReadCallback& callback,
50                               size_t size,
51                               ErrorPtr error) {
52   if (error)
53     return callback.Run(0, std::move(error));
54 
55   size_done_ += size;
56   if (size) {
57     return destination_->Write(
58         buffer_.data(), size,
59         base::Bind(&StreamCopier::OnWriteDone, weak_ptr_factory_.GetWeakPtr(),
60                    callback));
61   }
62   callback.Run(size_done_, nullptr);
63 }
64 
OnWriteDone(const InputStream::ReadCallback & callback,ErrorPtr error)65 void StreamCopier::OnWriteDone(const InputStream::ReadCallback& callback,
66                                ErrorPtr error) {
67   if (error)
68     return callback.Run(size_done_, std::move(error));
69   Copy(callback);
70 }
71 
72 }  // namespace weave
73