1 // Copyright 2015 The Weave Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "src/streams.h"
6
7 #include <base/bind.h>
8 #include <base/callback.h>
9 #include <weave/provider/task_runner.h>
10 #include <weave/stream.h>
11
12 namespace weave {
13
14 namespace {} // namespace
15
MemoryStream(const std::vector<uint8_t> & data,provider::TaskRunner * task_runner)16 MemoryStream::MemoryStream(const std::vector<uint8_t>& data,
17 provider::TaskRunner* task_runner)
18 : data_{data}, task_runner_{task_runner} {}
19
Read(void * buffer,size_t size_to_read,const ReadCallback & callback)20 void MemoryStream::Read(void* buffer,
21 size_t size_to_read,
22 const ReadCallback& callback) {
23 CHECK_LE(read_position_, data_.size());
24 size_t size_read = std::min(size_to_read, data_.size() - read_position_);
25 if (size_read > 0)
26 memcpy(buffer, data_.data() + read_position_, size_read);
27 read_position_ += size_read;
28 task_runner_->PostDelayedTask(FROM_HERE,
29 base::Bind(callback, size_read, nullptr), {});
30 }
31
Write(const void * buffer,size_t size_to_write,const WriteCallback & callback)32 void MemoryStream::Write(const void* buffer,
33 size_t size_to_write,
34 const WriteCallback& callback) {
35 data_.insert(data_.end(), static_cast<const char*>(buffer),
36 static_cast<const char*>(buffer) + size_to_write);
37 task_runner_->PostDelayedTask(FROM_HERE, base::Bind(callback, nullptr), {});
38 }
39
StreamCopier(InputStream * source,OutputStream * destination)40 StreamCopier::StreamCopier(InputStream* source, OutputStream* destination)
41 : source_{source}, destination_{destination}, buffer_(4096) {}
42
Copy(const InputStream::ReadCallback & callback)43 void StreamCopier::Copy(const InputStream::ReadCallback& callback) {
44 source_->Read(buffer_.data(), buffer_.size(),
45 base::Bind(&StreamCopier::OnReadDone,
46 weak_ptr_factory_.GetWeakPtr(), callback));
47 }
48
OnReadDone(const InputStream::ReadCallback & callback,size_t size,ErrorPtr error)49 void StreamCopier::OnReadDone(const InputStream::ReadCallback& callback,
50 size_t size,
51 ErrorPtr error) {
52 if (error)
53 return callback.Run(0, std::move(error));
54
55 size_done_ += size;
56 if (size) {
57 return destination_->Write(
58 buffer_.data(), size,
59 base::Bind(&StreamCopier::OnWriteDone, weak_ptr_factory_.GetWeakPtr(),
60 callback));
61 }
62 callback.Run(size_done_, nullptr);
63 }
64
OnWriteDone(const InputStream::ReadCallback & callback,ErrorPtr error)65 void StreamCopier::OnWriteDone(const InputStream::ReadCallback& callback,
66 ErrorPtr error) {
67 if (error)
68 return callback.Run(size_done_, std::move(error));
69 Copy(callback);
70 }
71
72 } // namespace weave
73