1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
6 #define MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
7 
8 #include <stdint.h>
9 
10 #include <deque>
11 #include <map>
12 #include <memory>
13 #include <string>
14 
15 #include "base/logging.h"
16 #include "base/macros.h"
17 #include "base/memory/ref_counted.h"
18 #include "base/memory/weak_ptr.h"
19 #include "base/single_thread_task_runner.h"
20 #include "base/synchronization/lock.h"
21 #include "base/threading/thread_checker.h"
22 #include "mojo/public/cpp/bindings/associated_group_controller.h"
23 #include "mojo/public/cpp/bindings/connector.h"
24 #include "mojo/public/cpp/bindings/interface_id.h"
25 #include "mojo/public/cpp/bindings/message_header_validator.h"
26 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
27 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
28 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
29 #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
30 
31 namespace base {
32 class SingleThreadTaskRunner;
33 }
34 
35 namespace mojo {
36 
37 class AssociatedGroup;
38 
39 namespace internal {
40 
41 // MultiplexRouter supports routing messages for multiple interfaces over a
42 // single message pipe.
43 //
44 // It is created on the thread where the master interface of the message pipe
45 // lives. Although it is ref-counted, it is guarateed to be destructed on the
46 // same thread.
47 // Some public methods are only allowed to be called on the creating thread;
48 // while the others are safe to call from any threads. Please see the method
49 // comments for more details.
50 class MultiplexRouter
51     : public MessageReceiver,
52       public AssociatedGroupController,
53       public PipeControlMessageHandlerDelegate {
54  public:
55   // If |set_interface_id_namespace_bit| is true, the interface IDs generated by
56   // this router will have the highest bit set.
57   MultiplexRouter(bool set_interface_id_namespace_bit,
58                   ScopedMessagePipeHandle message_pipe,
59                   scoped_refptr<base::SingleThreadTaskRunner> runner);
60 
61   // Sets the master interface name for this router. Only used when reporting
62   // message header or control message validation errors.
63   void SetMasterInterfaceName(const std::string& name);
64 
65   // ---------------------------------------------------------------------------
66   // The following public methods are safe to call from any threads.
67 
68   // AssociatedGroupController implementation:
69   void CreateEndpointHandlePair(
70       ScopedInterfaceEndpointHandle* local_endpoint,
71       ScopedInterfaceEndpointHandle* remote_endpoint) override;
72   ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
73       InterfaceId id) override;
74   void CloseEndpointHandle(InterfaceId id, bool is_local) override;
75   InterfaceEndpointController* AttachEndpointClient(
76       const ScopedInterfaceEndpointHandle& handle,
77       InterfaceEndpointClient* endpoint_client,
78       scoped_refptr<base::SingleThreadTaskRunner> runner) override;
79   void DetachEndpointClient(
80       const ScopedInterfaceEndpointHandle& handle) override;
81   void RaiseError() override;
82 
83   // ---------------------------------------------------------------------------
84   // The following public methods are called on the creating thread.
85 
86   // Please note that this method shouldn't be called unless it results from an
87   // explicit request of the user of bindings (e.g., the user sets an
88   // InterfacePtr to null or closes a Binding).
89   void CloseMessagePipe();
90 
91   // Extracts the underlying message pipe.
PassMessagePipe()92   ScopedMessagePipeHandle PassMessagePipe() {
93     DCHECK(thread_checker_.CalledOnValidThread());
94     DCHECK(!HasAssociatedEndpoints());
95     return connector_.PassMessagePipe();
96   }
97 
98   // Blocks the current thread until the first incoming message, or |deadline|.
WaitForIncomingMessage(MojoDeadline deadline)99   bool WaitForIncomingMessage(MojoDeadline deadline) {
100     DCHECK(thread_checker_.CalledOnValidThread());
101     return connector_.WaitForIncomingMessage(deadline);
102   }
103 
104   // See Binding for details of pause/resume.
PauseIncomingMethodCallProcessing()105   void PauseIncomingMethodCallProcessing() {
106     DCHECK(thread_checker_.CalledOnValidThread());
107     connector_.PauseIncomingMethodCallProcessing();
108   }
ResumeIncomingMethodCallProcessing()109   void ResumeIncomingMethodCallProcessing() {
110     DCHECK(thread_checker_.CalledOnValidThread());
111     connector_.ResumeIncomingMethodCallProcessing();
112   }
113 
114   // Whether there are any associated interfaces running currently.
115   bool HasAssociatedEndpoints() const;
116 
117   // Sets this object to testing mode.
118   // In testing mode, the object doesn't disconnect the underlying message pipe
119   // when it receives unexpected or invalid messages.
120   void EnableTestingMode();
121 
122   // Is the router bound to a message pipe handle?
is_valid()123   bool is_valid() const {
124     DCHECK(thread_checker_.CalledOnValidThread());
125     return connector_.is_valid();
126   }
127 
128   // TODO(yzshen): consider removing this getter.
handle()129   MessagePipeHandle handle() const {
130     DCHECK(thread_checker_.CalledOnValidThread());
131     return connector_.handle();
132   }
133 
134  private:
135   class InterfaceEndpoint;
136   struct Task;
137 
138   ~MultiplexRouter() override;
139 
140   // MessageReceiver implementation:
141   bool Accept(Message* message) override;
142 
143   // PipeControlMessageHandlerDelegate implementation:
144   bool OnPeerAssociatedEndpointClosed(InterfaceId id) override;
145   bool OnAssociatedEndpointClosedBeforeSent(InterfaceId id) override;
146 
147   void OnPipeConnectionError();
148 
149   // Specifies whether we are allowed to directly call into
150   // InterfaceEndpointClient (given that we are already on the same thread as
151   // the client).
152   enum ClientCallBehavior {
153     // Don't call any InterfaceEndpointClient methods directly.
154     NO_DIRECT_CLIENT_CALLS,
155     // Only call InterfaceEndpointClient::HandleIncomingMessage directly to
156     // handle sync messages.
157     ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES,
158     // Allow to call any InterfaceEndpointClient methods directly.
159     ALLOW_DIRECT_CLIENT_CALLS
160   };
161 
162   // Processes enqueued tasks (incoming messages and error notifications).
163   // |current_task_runner| is only used when |client_call_behavior| is
164   // ALLOW_DIRECT_CLIENT_CALLS to determine whether we are on the right task
165   // runner to make client calls for async messages or connection error
166   // notifications.
167   //
168   // Note: Because calling into InterfaceEndpointClient may lead to destruction
169   // of this object, if direct calls are allowed, the caller needs to hold on to
170   // a ref outside of |lock_| before calling this method.
171   void ProcessTasks(ClientCallBehavior client_call_behavior,
172                     base::SingleThreadTaskRunner* current_task_runner);
173 
174   // Processes the first queued sync message for the endpoint corresponding to
175   // |id|; returns whether there are more sync messages for that endpoint in the
176   // queue.
177   //
178   // This method is only used by enpoints during sync watching. Therefore, not
179   // all sync messages are handled by it.
180   bool ProcessFirstSyncMessageForEndpoint(InterfaceId id);
181 
182   // Returns true to indicate that |task|/|message| has been processed.
183   bool ProcessNotifyErrorTask(
184       Task* task,
185       ClientCallBehavior client_call_behavior,
186       base::SingleThreadTaskRunner* current_task_runner);
187   bool ProcessIncomingMessage(
188       Message* message,
189       ClientCallBehavior client_call_behavior,
190       base::SingleThreadTaskRunner* current_task_runner);
191 
192   void MaybePostToProcessTasks(base::SingleThreadTaskRunner* task_runner);
193   void LockAndCallProcessTasks();
194 
195   // Updates the state of |endpoint|. If both the endpoint and its peer have
196   // been closed, removes it from |endpoints_|.
197   // NOTE: The method may invalidate |endpoint|.
198   enum EndpointStateUpdateType { ENDPOINT_CLOSED, PEER_ENDPOINT_CLOSED };
199   void UpdateEndpointStateMayRemove(InterfaceEndpoint* endpoint,
200                                     EndpointStateUpdateType type);
201 
202   void RaiseErrorInNonTestingMode();
203 
204   InterfaceEndpoint* FindOrInsertEndpoint(InterfaceId id, bool* inserted);
205 
206   // Whether to set the namespace bit when generating interface IDs. Please see
207   // comments of kInterfaceIdNamespaceMask.
208   const bool set_interface_id_namespace_bit_;
209 
210   MessageHeaderValidator header_validator_;
211   Connector connector_;
212 
213   base::ThreadChecker thread_checker_;
214 
215   // Protects the following members.
216   mutable base::Lock lock_;
217   PipeControlMessageHandler control_message_handler_;
218   PipeControlMessageProxy control_message_proxy_;
219 
220   std::map<InterfaceId, scoped_refptr<InterfaceEndpoint>> endpoints_;
221   uint32_t next_interface_id_value_;
222 
223   std::deque<std::unique_ptr<Task>> tasks_;
224   // It refers to tasks in |tasks_| and doesn't own any of them.
225   std::map<InterfaceId, std::deque<Task*>> sync_message_tasks_;
226 
227   bool posted_to_process_tasks_;
228   scoped_refptr<base::SingleThreadTaskRunner> posted_to_task_runner_;
229 
230   bool encountered_error_;
231 
232   bool testing_mode_;
233 
234   DISALLOW_COPY_AND_ASSIGN(MultiplexRouter);
235 };
236 
237 }  // namespace internal
238 }  // namespace mojo
239 
240 #endif  // MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
241