1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef PLATFORM_IMPL_SOCKET_HANDLE_WAITER_H_
6 #define PLATFORM_IMPL_SOCKET_HANDLE_WAITER_H_
7 
8 #include <condition_variable>
9 #include <functional>
10 #include <memory>
11 #include <mutex>
12 #include <unordered_map>
13 #include <vector>
14 
15 #include "platform/api/time.h"
16 #include "platform/base/error.h"
17 #include "platform/base/macros.h"
18 #include "platform/impl/socket_handle.h"
19 
20 namespace openscreen {
21 
22 // The class responsible for calling platform-level method to watch UDP sockets
23 // for available read data. Reading from these sockets is handled at a higher
24 // layer.
25 class SocketHandleWaiter {
26  public:
27   using SocketHandleRef = std::reference_wrapper<const SocketHandle>;
28 
29   enum Flags {
30     kReadable = 1,
31     kWriteable = 2,
32   };
33 
34   class Subscriber {
35    public:
36     virtual ~Subscriber() = default;
37 
38     // Provides a socket handle to the subscriber which has data waiting to be
39     // processed.
40     virtual void ProcessReadyHandle(SocketHandleRef handle, uint32_t flags) = 0;
41   };
42 
43   explicit SocketHandleWaiter(ClockNowFunctionPtr now_function);
44   virtual ~SocketHandleWaiter() = default;
45 
46   // Start notifying |subscriber| whenever |handle| has an event. May be called
47   // multiple times, to be notified for multiple handles, but should not be
48   // called multiple times for the same handle.
49   void Subscribe(Subscriber* subscriber, SocketHandleRef handle);
50 
51   // Stop receiving notifications for one of the handles currently subscribed
52   // to.
53   void Unsubscribe(Subscriber* subscriber, SocketHandleRef handle);
54 
55   // Stop receiving notifications for all handles currently subscribed to, or
56   // no-op if there are no subscriptions.
57   void UnsubscribeAll(Subscriber* subscriber);
58 
59   // Called when a handle will be deleted to ensure that deletion can proceed
60   // safely.
61   void OnHandleDeletion(Subscriber* subscriber,
62                         SocketHandleRef handle,
63                         bool disable_locking_for_testing = false);
64 
65   OSP_DISALLOW_COPY_AND_ASSIGN(SocketHandleWaiter);
66 
67   // Gets all socket handles to process, checks them for readable data, and
68   // handles any changes that have occured.
69   Error ProcessHandles(Clock::duration timeout);
70 
71  protected:
72   struct ReadyHandle {
73     SocketHandleRef handle;
74     uint32_t flags;
75   };
76 
77   // Waits until data is available in one of the provided sockets or the
78   // provided timeout has passed - whichever is first. If any sockets have data
79   // available, they are returned.
80   virtual ErrorOr<std::vector<ReadyHandle>> AwaitSocketsReadable(
81       const std::vector<SocketHandleRef>& socket_fds,
82       const Clock::duration& timeout) = 0;
83 
84  private:
85   struct SocketSubscription {
86     Subscriber* subscriber = nullptr;
87     Clock::time_point last_updated = Clock::time_point::min();
88   };
89 
90   struct HandleWithSubscription {
91     ReadyHandle ready_handle;
92     // Reference to the original subscription in the unordered map, so
93     // we can keep track of when we updated this socket handle.
94     SocketSubscription* subscription;
95   };
96 
97   // Call the subscriber associated with each changed handle.  Handles are only
98   // processed until |timeout| is exceeded.  Must be called with |mutex_| held.
99   void ProcessReadyHandles(std::vector<HandleWithSubscription>* handles,
100                            Clock::duration timeout);
101 
102   // Guards against concurrent access to all other class data members.
103   std::mutex mutex_;
104 
105   // Blocks deletion of handles until they are no longer being watched.
106   std::condition_variable handle_deletion_block_;
107 
108   // Set of handles currently being deleted, for ensuring handle_deletion_block_
109   // does not exit prematurely.
110   std::vector<SocketHandleRef> handles_being_deleted_;
111 
112   // Set of all socket handles currently being watched, mapped to the subscriber
113   // that is watching them.
114   std::unordered_map<SocketHandleRef, SocketSubscription, SocketHandleHash>
115       handle_mappings_;
116 
117   const ClockNowFunctionPtr now_function_;
118 };
119 
120 }  // namespace openscreen
121 
122 #endif  // PLATFORM_IMPL_SOCKET_HANDLE_WAITER_H_
123