1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <brillo/streams/input_stream_set.h>
6
7 #include <base/bind.h>
8 #include <brillo/message_loops/message_loop.h>
9 #include <brillo/streams/stream_errors.h>
10 #include <brillo/streams/stream_utils.h>
11
12 namespace brillo {
13
InputStreamSet(std::vector<Stream * > source_streams,std::vector<StreamPtr> owned_source_streams,uint64_t initial_stream_size)14 InputStreamSet::InputStreamSet(
15 std::vector<Stream*> source_streams,
16 std::vector<StreamPtr> owned_source_streams,
17 uint64_t initial_stream_size)
18 : source_streams_{std::move(source_streams)},
19 owned_source_streams_{std::move(owned_source_streams)},
20 initial_stream_size_{initial_stream_size} {}
21
Create(std::vector<Stream * > source_streams,std::vector<StreamPtr> owned_source_streams,ErrorPtr * error)22 StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams,
23 std::vector<StreamPtr> owned_source_streams,
24 ErrorPtr* error) {
25 StreamPtr stream;
26
27 if (source_streams.empty()) {
28 Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
29 errors::stream::kInvalidParameter,
30 "Source stream list is empty");
31 return stream;
32 }
33
34 // Make sure we have only readable streams.
35 for (Stream* src_stream : source_streams) {
36 if (!src_stream->CanRead()) {
37 Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
38 errors::stream::kInvalidParameter,
39 "The stream list must contain only readable streams");
40 return stream;
41 }
42 }
43
44 // We are using remaining size here because the multiplexed stream is not
45 // seekable and the bytes already read are essentially "lost" as far as this
46 // stream is concerned.
47 uint64_t initial_stream_size = 0;
48 for (const Stream* stream : source_streams)
49 initial_stream_size += stream->GetRemainingSize();
50
51 stream.reset(new InputStreamSet{std::move(source_streams),
52 std::move(owned_source_streams),
53 initial_stream_size});
54 return stream;
55 }
56
Create(std::vector<Stream * > source_streams,ErrorPtr * error)57 StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams,
58 ErrorPtr* error) {
59 return Create(std::move(source_streams), {}, error);
60 }
61
Create(std::vector<StreamPtr> owned_source_streams,ErrorPtr * error)62 StreamPtr InputStreamSet::Create(std::vector<StreamPtr> owned_source_streams,
63 ErrorPtr* error) {
64 std::vector<Stream*> source_streams;
65 source_streams.reserve(owned_source_streams.size());
66 for (const StreamPtr& stream : owned_source_streams)
67 source_streams.push_back(stream.get());
68 return Create(std::move(source_streams), std::move(owned_source_streams),
69 error);
70 }
71
IsOpen() const72 bool InputStreamSet::IsOpen() const {
73 return !closed_;
74 }
75
CanGetSize() const76 bool InputStreamSet::CanGetSize() const {
77 bool can_get_size = IsOpen();
78 for (const Stream* stream : source_streams_) {
79 if (!stream->CanGetSize()) {
80 can_get_size = false;
81 break;
82 }
83 }
84 return can_get_size;
85 }
86
GetSize() const87 uint64_t InputStreamSet::GetSize() const {
88 return initial_stream_size_;
89 }
90
SetSizeBlocking(uint64_t,ErrorPtr * error)91 bool InputStreamSet::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
92 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
93 }
94
GetRemainingSize() const95 uint64_t InputStreamSet::GetRemainingSize() const {
96 uint64_t size = 0;
97 for (const Stream* stream : source_streams_)
98 size += stream->GetRemainingSize();
99 return size;
100 }
101
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)102 bool InputStreamSet::Seek(int64_t /* offset */,
103 Whence /* whence */,
104 uint64_t* /* new_position */,
105 ErrorPtr* error) {
106 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
107 }
108
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)109 bool InputStreamSet::ReadNonBlocking(void* buffer,
110 size_t size_to_read,
111 size_t* size_read,
112 bool* end_of_stream,
113 ErrorPtr* error) {
114 if (!IsOpen())
115 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
116
117 while (!source_streams_.empty()) {
118 Stream* stream = source_streams_.front();
119 bool eos = false;
120 if (!stream->ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
121 return false;
122
123 if (*size_read > 0 || !eos) {
124 if (end_of_stream)
125 *end_of_stream = false;
126 return true;
127 }
128
129 source_streams_.erase(source_streams_.begin());
130 }
131 *size_read = 0;
132 if (end_of_stream)
133 *end_of_stream = true;
134 return true;
135 }
136
WriteNonBlocking(const void *,size_t,size_t *,ErrorPtr * error)137 bool InputStreamSet::WriteNonBlocking(const void* /* buffer */,
138 size_t /* size_to_write */,
139 size_t* /* size_written */,
140 ErrorPtr* error) {
141 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
142 }
143
CloseBlocking(ErrorPtr * error)144 bool InputStreamSet::CloseBlocking(ErrorPtr* error) {
145 bool success = true;
146 // We want to close only the owned streams.
147 for (StreamPtr& stream_ptr : owned_source_streams_) {
148 if (!stream_ptr->CloseBlocking(error))
149 success = false; // Keep going for other streams...
150 }
151 owned_source_streams_.clear();
152 source_streams_.clear();
153 initial_stream_size_ = 0;
154 closed_ = true;
155 return success;
156 }
157
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)158 bool InputStreamSet::WaitForData(
159 AccessMode mode,
160 const base::Callback<void(AccessMode)>& callback,
161 ErrorPtr* error) {
162 if (!IsOpen())
163 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
164
165 if (stream_utils::IsWriteAccessMode(mode))
166 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
167
168 if (!source_streams_.empty()) {
169 Stream* stream = source_streams_.front();
170 return stream->WaitForData(mode, callback, error);
171 }
172
173 MessageLoop::current()->PostTask(FROM_HERE, base::Bind(callback, mode));
174 return true;
175 }
176
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)177 bool InputStreamSet::WaitForDataBlocking(AccessMode in_mode,
178 base::TimeDelta timeout,
179 AccessMode* out_mode,
180 ErrorPtr* error) {
181 if (!IsOpen())
182 return stream_utils::ErrorStreamClosed(FROM_HERE, error);
183
184 if (stream_utils::IsWriteAccessMode(in_mode))
185 return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
186
187 if (!source_streams_.empty()) {
188 Stream* stream = source_streams_.front();
189 return stream->WaitForDataBlocking(in_mode, timeout, out_mode, error);
190 }
191
192 if (out_mode)
193 *out_mode = in_mode;
194 return true;
195 }
196
CancelPendingAsyncOperations()197 void InputStreamSet::CancelPendingAsyncOperations() {
198 if (IsOpen() && !source_streams_.empty()) {
199 Stream* stream = source_streams_.front();
200 stream->CancelPendingAsyncOperations();
201 }
202 Stream::CancelPendingAsyncOperations();
203 }
204
205 } // namespace brillo
206