1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "platform/impl/tls_data_router_posix.h"
6 
7 #include <memory>
8 #include <utility>
9 
10 #include "platform/impl/stream_socket_posix.h"
11 #include "platform/impl/tls_connection_posix.h"
12 #include "util/osp_logging.h"
13 
14 namespace openscreen {
15 
TlsDataRouterPosix(SocketHandleWaiter * waiter,std::function<Clock::time_point ()> now_function)16 TlsDataRouterPosix::TlsDataRouterPosix(
17     SocketHandleWaiter* waiter,
18     std::function<Clock::time_point()> now_function)
19     : waiter_(waiter), now_function_(now_function) {}
20 
~TlsDataRouterPosix()21 TlsDataRouterPosix::~TlsDataRouterPosix() {
22   waiter_->UnsubscribeAll(this);
23 }
24 
RegisterConnection(TlsConnectionPosix * connection)25 void TlsDataRouterPosix::RegisterConnection(TlsConnectionPosix* connection) {
26   {
27     std::lock_guard<std::mutex> lock(connections_mutex_);
28     OSP_DCHECK(std::find(connections_.begin(), connections_.end(),
29                          connection) == connections_.end());
30     connections_.push_back(connection);
31   }
32 
33   waiter_->Subscribe(this, connection->socket_handle());
34 }
35 
DeregisterConnection(TlsConnectionPosix * connection)36 void TlsDataRouterPosix::DeregisterConnection(TlsConnectionPosix* connection) {
37   {
38     std::lock_guard<std::mutex> lock(connections_mutex_);
39     auto it = std::remove_if(
40         connections_.begin(), connections_.end(),
41         [connection](TlsConnectionPosix* conn) { return conn == connection; });
42     if (it == connections_.end()) {
43       return;
44     }
45     connections_.erase(it, connections_.end());
46   }
47 
48   waiter_->OnHandleDeletion(this, connection->socket_handle(),
49                             disable_locking_for_testing_);
50 }
51 
RegisterAcceptObserver(std::unique_ptr<StreamSocketPosix> socket,SocketObserver * observer)52 void TlsDataRouterPosix::RegisterAcceptObserver(
53     std::unique_ptr<StreamSocketPosix> socket,
54     SocketObserver* observer) {
55   OSP_DCHECK(observer);
56   StreamSocketPosix* socket_ptr = socket.get();
57   {
58     std::unique_lock<std::mutex> lock(accept_socket_mutex_);
59     accept_stream_sockets_.push_back(std::move(socket));
60     accept_socket_mappings_[socket_ptr] = observer;
61   }
62 
63   waiter_->Subscribe(this, socket_ptr->socket_handle());
64 }
65 
DeregisterAcceptObserver(SocketObserver * observer)66 void TlsDataRouterPosix::DeregisterAcceptObserver(SocketObserver* observer) {
67   std::vector<std::unique_ptr<StreamSocketPosix>> sockets_to_delete;
68   {
69     std::unique_lock<std::mutex> lock(accept_socket_mutex_);
70     for (auto it = accept_stream_sockets_.begin();
71          it != accept_stream_sockets_.end();) {
72       auto map_entry = accept_socket_mappings_.find(it->get());
73       OSP_DCHECK(map_entry != accept_socket_mappings_.end());
74       if (map_entry->second == observer) {
75         sockets_to_delete.push_back(std::move(*it));
76         accept_socket_mappings_.erase(map_entry);
77         it = accept_stream_sockets_.erase(it);
78       } else {
79         ++it;
80       }
81     }
82   }
83 
84   for (auto& socket : sockets_to_delete) {
85     waiter_->OnHandleDeletion(this, socket->socket_handle(),
86                               disable_locking_for_testing_);
87   }
88 }
89 
ProcessReadyHandle(SocketHandleWaiter::SocketHandleRef handle,uint32_t flags)90 void TlsDataRouterPosix::ProcessReadyHandle(
91     SocketHandleWaiter::SocketHandleRef handle,
92     uint32_t flags) {
93   if (flags & SocketHandleWaiter::Flags::kReadable) {
94     std::unique_lock<std::mutex> lock(accept_socket_mutex_);
95     for (const auto& pair : accept_socket_mappings_) {
96       if (pair.first->socket_handle() == handle) {
97         pair.second->OnConnectionPending(pair.first);
98         return;
99       }
100     }
101   }
102   {
103     std::lock_guard<std::mutex> lock(connections_mutex_);
104     for (TlsConnectionPosix* connection : connections_) {
105       if (connection->socket_handle() == handle) {
106         if (flags & SocketHandleWaiter::Flags::kReadable) {
107           connection->TryReceiveMessage();
108         }
109         if (flags & SocketHandleWaiter::Flags::kWriteable) {
110           connection->SendAvailableBytes();
111         }
112         return;
113       }
114     }
115   }
116 }
117 
HasTimedOut(Clock::time_point start_time,Clock::duration timeout)118 bool TlsDataRouterPosix::HasTimedOut(Clock::time_point start_time,
119                                      Clock::duration timeout) {
120   return now_function_() - start_time > timeout;
121 }
122 
IsSocketWatched(StreamSocketPosix * socket) const123 bool TlsDataRouterPosix::IsSocketWatched(StreamSocketPosix* socket) const {
124   std::unique_lock<std::mutex> lock(accept_socket_mutex_);
125   return accept_socket_mappings_.find(socket) != accept_socket_mappings_.end();
126 }
127 
128 }  // namespace openscreen
129