1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "osp/public/presentation/presentation_connection.h"
6 
7 #include <algorithm>
8 #include <memory>
9 #include <ostream>
10 
11 #include "absl/strings/string_view.h"
12 #include "osp/impl/presentation/presentation_common.h"
13 #include "osp/msgs/osp_messages.h"
14 #include "osp/public/network_service_manager.h"
15 #include "osp/public/presentation/presentation_controller.h"
16 #include "osp/public/presentation/presentation_receiver.h"
17 #include "osp/public/protocol_connection.h"
18 #include "util/osp_logging.h"
19 #include "util/std_util.h"
20 
21 // TODO(crbug.com/openscreen/27): Address TODOs in this file
22 
23 namespace openscreen {
24 namespace osp {
25 
26 namespace {
27 
28 // TODO(jophba): replace Write methods with a unified write message surface
WriteConnectionMessage(const msgs::PresentationConnectionMessage & message,ProtocolConnection * connection)29 Error WriteConnectionMessage(const msgs::PresentationConnectionMessage& message,
30                              ProtocolConnection* connection) {
31   return connection->WriteMessage(message,
32                                   msgs::EncodePresentationConnectionMessage);
33 }
34 }  // namespace
35 
Connection(const PresentationInfo & info,Delegate * delegate,ParentDelegate * parent_delegate)36 Connection::Connection(const PresentationInfo& info,
37                        Delegate* delegate,
38                        ParentDelegate* parent_delegate)
39     : presentation_(info),
40       state_(State::kConnecting),
41       delegate_(delegate),
42       parent_delegate_(parent_delegate),
43       connection_id_(0),
44       protocol_connection_(nullptr) {}
45 
~Connection()46 Connection::~Connection() {
47   if (state_ == State::kConnected) {
48     Close(CloseReason::kDiscarded);
49     delegate_->OnDiscarded();
50   }
51   parent_delegate_->OnConnectionDestroyed(this);
52 }
53 
OnConnecting()54 void Connection::OnConnecting() {
55   OSP_DCHECK(!protocol_connection_);
56   state_ = State::kConnecting;
57 }
58 
OnConnected(uint64_t connection_id,uint64_t endpoint_id,std::unique_ptr<ProtocolConnection> protocol_connection)59 void Connection::OnConnected(
60     uint64_t connection_id,
61     uint64_t endpoint_id,
62     std::unique_ptr<ProtocolConnection> protocol_connection) {
63   if (state_ != State::kConnecting) {
64     return;
65   }
66   connection_id_ = connection_id;
67   endpoint_id_ = endpoint_id;
68   protocol_connection_ = std::move(protocol_connection);
69   state_ = State::kConnected;
70   delegate_->OnConnected();
71 }
72 
OnClosed()73 bool Connection::OnClosed() {
74   if (state_ != State::kConnecting && state_ != State::kConnected)
75     return false;
76 
77   protocol_connection_.reset();
78   state_ = State::kClosed;
79 
80   return true;
81 }
82 
OnClosedByError(Error cause)83 void Connection::OnClosedByError(Error cause) {
84   if (OnClosed()) {
85     std::ostringstream stream;
86     stream << cause;
87     delegate_->OnError(stream.str());
88   }
89 }
90 
OnClosedByRemote()91 void Connection::OnClosedByRemote() {
92   if (OnClosed())
93     delegate_->OnClosedByRemote();
94 }
95 
OnTerminated()96 void Connection::OnTerminated() {
97   if (state_ == State::kTerminated)
98     return;
99   protocol_connection_.reset();
100   state_ = State::kTerminated;
101   delegate_->OnTerminated();
102 }
103 
SendString(absl::string_view message)104 Error Connection::SendString(absl::string_view message) {
105   if (state_ != State::kConnected)
106     return Error::Code::kNoActiveConnection;
107 
108   msgs::PresentationConnectionMessage cbor_message;
109   OSP_LOG_INFO << "sending '" << message << "' to (" << presentation_.id << ", "
110                << connection_id_.value() << ")";
111   cbor_message.connection_id = connection_id_.value();
112   cbor_message.message.which =
113       msgs::PresentationConnectionMessage::Message::Which::kString;
114 
115   new (&cbor_message.message.str) std::string(message);
116 
117   return WriteConnectionMessage(cbor_message, protocol_connection_.get());
118 }
119 
SendBinary(std::vector<uint8_t> && data)120 Error Connection::SendBinary(std::vector<uint8_t>&& data) {
121   if (state_ != State::kConnected)
122     return Error::Code::kNoActiveConnection;
123 
124   msgs::PresentationConnectionMessage cbor_message;
125   OSP_LOG_INFO << "sending " << data.size() << " bytes to (" << presentation_.id
126                << ", " << connection_id_.value() << ")";
127   cbor_message.connection_id = connection_id_.value();
128   cbor_message.message.which =
129       msgs::PresentationConnectionMessage::Message::Which::kBytes;
130 
131   new (&cbor_message.message.bytes) std::vector<uint8_t>(std::move(data));
132 
133   return WriteConnectionMessage(cbor_message, protocol_connection_.get());
134   return Error::None();
135 }
136 
Close(CloseReason reason)137 Error Connection::Close(CloseReason reason) {
138   if (state_ == State::kClosed || state_ == State::kTerminated)
139     return Error::Code::kAlreadyClosed;
140 
141   state_ = State::kClosed;
142   protocol_connection_.reset();
143 
144   return parent_delegate_->CloseConnection(this, reason);
145 }
146 
Terminate(TerminationReason reason)147 void Connection::Terminate(TerminationReason reason) {
148   if (state_ == State::kTerminated)
149     return;
150   state_ = State::kTerminated;
151   protocol_connection_.reset();
152   parent_delegate_->OnPresentationTerminated(presentation_.id, reason);
153 }
154 
ConnectionManager(MessageDemuxer * demuxer)155 ConnectionManager::ConnectionManager(MessageDemuxer* demuxer) {
156   message_watch_ = demuxer->SetDefaultMessageTypeWatch(
157       msgs::Type::kPresentationConnectionMessage, this);
158 
159   close_request_watch_ = demuxer->SetDefaultMessageTypeWatch(
160       msgs::Type::kPresentationConnectionCloseRequest, this);
161 
162   close_event_watch_ = demuxer->SetDefaultMessageTypeWatch(
163       msgs::Type::kPresentationConnectionCloseEvent, this);
164 }
165 
AddConnection(Connection * connection)166 void ConnectionManager::AddConnection(Connection* connection) {
167   auto emplace_result =
168       connections_.emplace(connection->connection_id(), connection);
169 
170   OSP_DCHECK(emplace_result.second);
171 }
172 
RemoveConnection(Connection * connection)173 void ConnectionManager::RemoveConnection(Connection* connection) {
174   auto entry = connections_.find(connection->connection_id());
175   if (entry != connections_.end()) {
176     connections_.erase(entry);
177   }
178 }
179 
180 // TODO(jophba): add a utility object to track requests/responses
181 // TODO(jophba): refine the RegisterWatch/OnStreamMessage API. We
182 // should add a layer between the message logic and the parse/dispatch
183 // logic, and remove the CBOR information from ConnectionManager.
OnStreamMessage(uint64_t endpoint_id,uint64_t connection_id,msgs::Type message_type,const uint8_t * buffer,size_t buffer_size,Clock::time_point now)184 ErrorOr<size_t> ConnectionManager::OnStreamMessage(uint64_t endpoint_id,
185                                                    uint64_t connection_id,
186                                                    msgs::Type message_type,
187                                                    const uint8_t* buffer,
188                                                    size_t buffer_size,
189                                                    Clock::time_point now) {
190   switch (message_type) {
191     case msgs::Type::kPresentationConnectionMessage: {
192       msgs::PresentationConnectionMessage message;
193       ssize_t bytes_decoded = msgs::DecodePresentationConnectionMessage(
194           buffer, buffer_size, &message);
195       if (bytes_decoded < 0) {
196         OSP_LOG_WARN << "presentation-connection-message parse error";
197         return Error::Code::kParseError;
198       }
199 
200       Connection* connection = GetConnection(message.connection_id);
201       if (!connection) {
202         return Error::Code::kItemNotFound;
203       }
204 
205       switch (message.message.which) {
206         case decltype(message.message.which)::kString:
207           connection->get_delegate()->OnStringMessage(message.message.str);
208           break;
209         case decltype(message.message.which)::kBytes:
210           connection->get_delegate()->OnBinaryMessage(message.message.bytes);
211           break;
212         default:
213           OSP_LOG_WARN << "uninitialized message data in "
214                           "presentation-connection-message";
215           break;
216       }
217       return bytes_decoded;
218     }
219 
220     case msgs::Type::kPresentationConnectionCloseRequest: {
221       msgs::PresentationConnectionCloseRequest request;
222       ssize_t bytes_decoded = msgs::DecodePresentationConnectionCloseRequest(
223           buffer, buffer_size, &request);
224       if (bytes_decoded < 0) {
225         OSP_LOG_WARN << "decode presentation-connection-close-request error: "
226                      << bytes_decoded;
227         return Error::Code::kCborInvalidMessage;
228       }
229 
230       msgs::PresentationConnectionCloseResponse response;
231       response.request_id = request.request_id;
232 
233       Connection* connection = GetConnection(request.connection_id);
234       if (connection) {
235         response.result =
236             msgs::PresentationConnectionCloseResponse_result::kSuccess;
237         connection->OnClosedByRemote();
238       } else {
239         response.result = msgs::PresentationConnectionCloseResponse_result::
240             kInvalidConnectionId;
241       }
242 
243       std::unique_ptr<ProtocolConnection> protocol_connection =
244           NetworkServiceManager::Get()
245               ->GetProtocolConnectionServer()
246               ->CreateProtocolConnection(endpoint_id);
247       if (protocol_connection) {
248         protocol_connection->WriteMessage(
249             response, &msgs::EncodePresentationConnectionCloseResponse);
250       }
251 
252       return (response.result ==
253               msgs::PresentationConnectionCloseResponse_result::kSuccess)
254                  ? ErrorOr<size_t>(bytes_decoded)
255                  : Error::Code::kNoActiveConnection;
256     }
257 
258     case msgs::Type::kPresentationConnectionCloseEvent: {
259       msgs::PresentationConnectionCloseEvent event;
260       ssize_t bytes_decoded = msgs::DecodePresentationConnectionCloseEvent(
261           buffer, buffer_size, &event);
262       if (bytes_decoded < 0) {
263         OSP_LOG_WARN << "decode presentation-connection-close-event error: "
264                      << bytes_decoded;
265         return Error::Code::kParseError;
266       }
267 
268       Connection* connection = GetConnection(event.connection_id);
269       if (!connection) {
270         return Error::Code::kNoActiveConnection;
271       }
272 
273       connection->OnClosedByRemote();
274       return bytes_decoded;
275     }
276 
277     // TODO(jophba): The spec says to close the connection if we get a message
278     // we don't understand. Figure out how to honor the spec here.
279     default:
280       return Error::Code::kUnknownMessageType;
281   }
282 }
283 
GetConnection(uint64_t connection_id)284 Connection* ConnectionManager::GetConnection(uint64_t connection_id) {
285   auto entry = connections_.find(connection_id);
286   if (entry != connections_.end()) {
287     return entry->second;
288   }
289 
290   OSP_DVLOG << "unknown ID: " << connection_id;
291   return nullptr;
292 }
293 
294 }  // namespace osp
295 }  // namespace openscreen
296