1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <brillo/streams/input_stream_set.h>
6 
7 #include <base/bind.h>
8 #include <brillo/message_loops/message_loop.h>
9 #include <brillo/streams/stream_errors.h>
10 #include <brillo/streams/stream_utils.h>
11 
12 namespace brillo {
13 
InputStreamSet(std::vector<Stream * > source_streams,std::vector<StreamPtr> owned_source_streams,uint64_t initial_stream_size)14 InputStreamSet::InputStreamSet(
15     std::vector<Stream*> source_streams,
16     std::vector<StreamPtr> owned_source_streams,
17     uint64_t initial_stream_size)
18     : source_streams_{std::move(source_streams)},
19       owned_source_streams_{std::move(owned_source_streams)},
20       initial_stream_size_{initial_stream_size} {}
21 
Create(std::vector<Stream * > source_streams,std::vector<StreamPtr> owned_source_streams,ErrorPtr * error)22 StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams,
23                                  std::vector<StreamPtr> owned_source_streams,
24                                  ErrorPtr* error) {
25   StreamPtr stream;
26 
27   if (source_streams.empty()) {
28     Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
29                  errors::stream::kInvalidParameter,
30                  "Source stream list is empty");
31     return stream;
32   }
33 
34   // Make sure we have only readable streams.
35   for (Stream* src_stream : source_streams) {
36     if (!src_stream->CanRead()) {
37       Error::AddTo(error, FROM_HERE, errors::stream::kDomain,
38                    errors::stream::kInvalidParameter,
39                    "The stream list must contain only readable streams");
40       return stream;
41     }
42   }
43 
44   // We are using remaining size here because the multiplexed stream is not
45   // seekable and the bytes already read are essentially "lost" as far as this
46   // stream is concerned.
47   uint64_t initial_stream_size = 0;
48   for (const Stream* stream : source_streams)
49     initial_stream_size += stream->GetRemainingSize();
50 
51   stream.reset(new InputStreamSet{std::move(source_streams),
52                                   std::move(owned_source_streams),
53                                   initial_stream_size});
54   return stream;
55 }
56 
Create(std::vector<Stream * > source_streams,ErrorPtr * error)57 StreamPtr InputStreamSet::Create(std::vector<Stream*> source_streams,
58                                  ErrorPtr* error) {
59   return Create(std::move(source_streams), {}, error);
60 }
61 
Create(std::vector<StreamPtr> owned_source_streams,ErrorPtr * error)62 StreamPtr InputStreamSet::Create(std::vector<StreamPtr> owned_source_streams,
63                                  ErrorPtr* error) {
64   std::vector<Stream*> source_streams;
65   source_streams.reserve(owned_source_streams.size());
66   for (const StreamPtr& stream : owned_source_streams)
67     source_streams.push_back(stream.get());
68   return Create(std::move(source_streams), std::move(owned_source_streams),
69                 error);
70 }
71 
IsOpen() const72 bool InputStreamSet::IsOpen() const {
73   return !closed_;
74 }
75 
CanGetSize() const76 bool InputStreamSet::CanGetSize() const {
77   bool can_get_size = IsOpen();
78   for (const Stream* stream : source_streams_) {
79     if (!stream->CanGetSize()) {
80       can_get_size = false;
81       break;
82     }
83   }
84   return can_get_size;
85 }
86 
GetSize() const87 uint64_t InputStreamSet::GetSize() const {
88   return initial_stream_size_;
89 }
90 
SetSizeBlocking(uint64_t,ErrorPtr * error)91 bool InputStreamSet::SetSizeBlocking(uint64_t /* size */, ErrorPtr* error) {
92   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
93 }
94 
GetRemainingSize() const95 uint64_t InputStreamSet::GetRemainingSize() const {
96   uint64_t size = 0;
97   for (const Stream* stream : source_streams_)
98     size += stream->GetRemainingSize();
99   return size;
100 }
101 
Seek(int64_t,Whence,uint64_t *,ErrorPtr * error)102 bool InputStreamSet::Seek(int64_t /* offset */,
103                           Whence /* whence */,
104                           uint64_t* /* new_position */,
105                           ErrorPtr* error) {
106   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
107 }
108 
ReadNonBlocking(void * buffer,size_t size_to_read,size_t * size_read,bool * end_of_stream,ErrorPtr * error)109 bool InputStreamSet::ReadNonBlocking(void* buffer,
110                                      size_t size_to_read,
111                                      size_t* size_read,
112                                      bool* end_of_stream,
113                                      ErrorPtr* error) {
114   if (!IsOpen())
115     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
116 
117   while (!source_streams_.empty()) {
118     Stream* stream = source_streams_.front();
119     bool eos = false;
120     if (!stream->ReadNonBlocking(buffer, size_to_read, size_read, &eos, error))
121       return false;
122 
123     if (*size_read > 0 || !eos) {
124       if (end_of_stream)
125         *end_of_stream = false;
126       return true;
127     }
128 
129     source_streams_.erase(source_streams_.begin());
130   }
131   *size_read = 0;
132   if (end_of_stream)
133     *end_of_stream = true;
134   return true;
135 }
136 
WriteNonBlocking(const void *,size_t,size_t *,ErrorPtr * error)137 bool InputStreamSet::WriteNonBlocking(const void* /* buffer */,
138                                       size_t /* size_to_write */,
139                                       size_t* /* size_written */,
140                                       ErrorPtr* error) {
141   return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
142 }
143 
CloseBlocking(ErrorPtr * error)144 bool InputStreamSet::CloseBlocking(ErrorPtr* error) {
145   bool success = true;
146   // We want to close only the owned streams.
147   for (StreamPtr& stream_ptr : owned_source_streams_) {
148     if (!stream_ptr->CloseBlocking(error))
149       success = false;  // Keep going for other streams...
150   }
151   owned_source_streams_.clear();
152   source_streams_.clear();
153   initial_stream_size_ = 0;
154   closed_ = true;
155   return success;
156 }
157 
WaitForData(AccessMode mode,const base::Callback<void (AccessMode)> & callback,ErrorPtr * error)158 bool InputStreamSet::WaitForData(
159     AccessMode mode,
160     const base::Callback<void(AccessMode)>& callback,
161     ErrorPtr* error) {
162   if (!IsOpen())
163     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
164 
165   if (stream_utils::IsWriteAccessMode(mode))
166     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
167 
168   if (!source_streams_.empty()) {
169     Stream* stream = source_streams_.front();
170     return stream->WaitForData(mode, callback, error);
171   }
172 
173   MessageLoop::current()->PostTask(FROM_HERE, base::Bind(callback, mode));
174   return true;
175 }
176 
WaitForDataBlocking(AccessMode in_mode,base::TimeDelta timeout,AccessMode * out_mode,ErrorPtr * error)177 bool InputStreamSet::WaitForDataBlocking(AccessMode in_mode,
178                                          base::TimeDelta timeout,
179                                          AccessMode* out_mode,
180                                          ErrorPtr* error) {
181   if (!IsOpen())
182     return stream_utils::ErrorStreamClosed(FROM_HERE, error);
183 
184   if (stream_utils::IsWriteAccessMode(in_mode))
185     return stream_utils::ErrorOperationNotSupported(FROM_HERE, error);
186 
187   if (!source_streams_.empty()) {
188     Stream* stream = source_streams_.front();
189     return stream->WaitForDataBlocking(in_mode, timeout, out_mode, error);
190   }
191 
192   if (out_mode)
193     *out_mode = in_mode;
194   return true;
195 }
196 
CancelPendingAsyncOperations()197 void InputStreamSet::CancelPendingAsyncOperations() {
198   if (IsOpen() && !source_streams_.empty()) {
199     Stream* stream = source_streams_.front();
200     stream->CancelPendingAsyncOperations();
201   }
202   Stream::CancelPendingAsyncOperations();
203 }
204 
205 }  // namespace brillo
206