1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
6 #define MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
7 
8 #include <stdint.h>
9 
10 #include <map>
11 #include <memory>
12 #include <string>
13 
14 #include "base/compiler_specific.h"
15 #include "base/containers/queue.h"
16 #include "base/containers/small_map.h"
17 #include "base/logging.h"
18 #include "base/macros.h"
19 #include "base/memory/ref_counted.h"
20 #include "base/memory/weak_ptr.h"
21 #include "base/optional.h"
22 #include "base/sequence_checker.h"
23 #include "base/sequenced_task_runner.h"
24 #include "base/synchronization/lock.h"
25 #include "mojo/public/cpp/bindings/associated_group_controller.h"
26 #include "mojo/public/cpp/bindings/bindings_export.h"
27 #include "mojo/public/cpp/bindings/connector.h"
28 #include "mojo/public/cpp/bindings/filter_chain.h"
29 #include "mojo/public/cpp/bindings/interface_id.h"
30 #include "mojo/public/cpp/bindings/message_header_validator.h"
31 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
32 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
33 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
34 #include "mojo/public/cpp/bindings/scoped_interface_endpoint_handle.h"
35 
36 namespace base {
37 class SequencedTaskRunner;
38 }
39 
40 namespace mojo {
41 
42 namespace internal {
43 
44 // MultiplexRouter supports routing messages for multiple interfaces over a
45 // single message pipe.
46 //
47 // It is created on the sequence where the master interface of the message pipe
48 // lives. Although it is ref-counted, it is guarateed to be destructed on the
49 // same sequence.
50 // Some public methods are only allowed to be called on the creating sequence;
51 // while the others are safe to call from any sequence. Please see the method
52 // comments for more details.
53 //
54 // NOTE: CloseMessagePipe() or PassMessagePipe() MUST be called on |runner|'s
55 // sequence before this object is destroyed.
56 class MOJO_CPP_BINDINGS_EXPORT MultiplexRouter
57     : public MessageReceiver,
58       public AssociatedGroupController,
59       public PipeControlMessageHandlerDelegate {
60  public:
61   enum Config {
62     // There is only the master interface running on this router. Please note
63     // that because of interface versioning, the other side of the message pipe
64     // may use a newer master interface definition which passes associated
65     // interfaces. In that case, this router may still receive pipe control
66     // messages or messages targetting associated interfaces.
67     SINGLE_INTERFACE,
68     // Similar to the mode above, there is only the master interface running on
69     // this router. Besides, the master interface has sync methods.
70     SINGLE_INTERFACE_WITH_SYNC_METHODS,
71     // There may be associated interfaces running on this router.
72     MULTI_INTERFACE
73   };
74 
75   // If |set_interface_id_namespace_bit| is true, the interface IDs generated by
76   // this router will have the highest bit set.
77   MultiplexRouter(ScopedMessagePipeHandle message_pipe,
78                   Config config,
79                   bool set_interface_id_namespace_bit,
80                   scoped_refptr<base::SequencedTaskRunner> runner);
81 
82   // Adds a MessageReceiver which can filter a message after validation but
83   // before dispatch.
84   void AddIncomingMessageFilter(std::unique_ptr<MessageReceiver> filter);
85 
86   // Sets the master interface name for this router. Only used when reporting
87   // message header or control message validation errors.
88   // |name| must be a string literal.
89   void SetMasterInterfaceName(const char* name);
90 
91   // ---------------------------------------------------------------------------
92   // The following public methods are safe to call from any sequence.
93 
94   // AssociatedGroupController implementation:
95   InterfaceId AssociateInterface(
96       ScopedInterfaceEndpointHandle handle_to_send) override;
97   ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
98       InterfaceId id) override;
99   void CloseEndpointHandle(
100       InterfaceId id,
101       const base::Optional<DisconnectReason>& reason) override;
102   InterfaceEndpointController* AttachEndpointClient(
103       const ScopedInterfaceEndpointHandle& handle,
104       InterfaceEndpointClient* endpoint_client,
105       scoped_refptr<base::SequencedTaskRunner> runner) override;
106   void DetachEndpointClient(
107       const ScopedInterfaceEndpointHandle& handle) override;
108   void RaiseError() override;
109   bool PrefersSerializedMessages() override;
110 
111   // ---------------------------------------------------------------------------
112   // The following public methods are called on the creating sequence.
113 
114   // Please note that this method shouldn't be called unless it results from an
115   // explicit request of the user of bindings (e.g., the user sets an
116   // InterfacePtr to null or closes a Binding).
117   void CloseMessagePipe();
118 
119   // Extracts the underlying message pipe.
PassMessagePipe()120   ScopedMessagePipeHandle PassMessagePipe() {
121     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
122     DCHECK(!HasAssociatedEndpoints());
123     return connector_.PassMessagePipe();
124   }
125 
126   // Blocks the current sequence until the first incoming message, or
127   // |deadline|.
WaitForIncomingMessage(MojoDeadline deadline)128   bool WaitForIncomingMessage(MojoDeadline deadline) {
129     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
130     return connector_.WaitForIncomingMessage(deadline);
131   }
132 
133   // See Binding for details of pause/resume.
134   void PauseIncomingMethodCallProcessing();
135   void ResumeIncomingMethodCallProcessing();
136 
137   // Whether there are any associated interfaces running currently.
138   bool HasAssociatedEndpoints() const;
139 
140   // Sets this object to testing mode.
141   // In testing mode, the object doesn't disconnect the underlying message pipe
142   // when it receives unexpected or invalid messages.
143   void EnableTestingMode();
144 
145   // Is the router bound to a message pipe handle?
is_valid()146   bool is_valid() const {
147     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
148     return connector_.is_valid();
149   }
150 
151   // TODO(yzshen): consider removing this getter.
handle()152   MessagePipeHandle handle() const {
153     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
154     return connector_.handle();
155   }
156 
SimulateReceivingMessageForTesting(Message * message)157   bool SimulateReceivingMessageForTesting(Message* message) {
158     return filters_.Accept(message);
159   }
160 
161  private:
162   class InterfaceEndpoint;
163   class MessageWrapper;
164   struct Task;
165 
166   ~MultiplexRouter() override;
167 
168   // MessageReceiver implementation:
169   bool Accept(Message* message) override;
170 
171   // PipeControlMessageHandlerDelegate implementation:
172   bool OnPeerAssociatedEndpointClosed(
173       InterfaceId id,
174       const base::Optional<DisconnectReason>& reason) override;
175 
176   void OnPipeConnectionError();
177 
178   // Specifies whether we are allowed to directly call into
179   // InterfaceEndpointClient (given that we are already on the same sequence as
180   // the client).
181   enum ClientCallBehavior {
182     // Don't call any InterfaceEndpointClient methods directly.
183     NO_DIRECT_CLIENT_CALLS,
184     // Only call InterfaceEndpointClient::HandleIncomingMessage directly to
185     // handle sync messages.
186     ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES,
187     // Allow to call any InterfaceEndpointClient methods directly.
188     ALLOW_DIRECT_CLIENT_CALLS
189   };
190 
191   // Processes enqueued tasks (incoming messages and error notifications).
192   // |current_task_runner| is only used when |client_call_behavior| is
193   // ALLOW_DIRECT_CLIENT_CALLS to determine whether we are on the right task
194   // runner to make client calls for async messages or connection error
195   // notifications.
196   //
197   // Note: Because calling into InterfaceEndpointClient may lead to destruction
198   // of this object, if direct calls are allowed, the caller needs to hold on to
199   // a ref outside of |lock_| before calling this method.
200   void ProcessTasks(ClientCallBehavior client_call_behavior,
201                     base::SequencedTaskRunner* current_task_runner);
202 
203   // Processes the first queued sync message for the endpoint corresponding to
204   // |id|; returns whether there are more sync messages for that endpoint in the
205   // queue.
206   //
207   // This method is only used by enpoints during sync watching. Therefore, not
208   // all sync messages are handled by it.
209   bool ProcessFirstSyncMessageForEndpoint(InterfaceId id);
210 
211   // Returns true to indicate that |task|/|message| has been processed.
212   bool ProcessNotifyErrorTask(Task* task,
213                               ClientCallBehavior client_call_behavior,
214                               base::SequencedTaskRunner* current_task_runner);
215   bool ProcessIncomingMessage(MessageWrapper* message_wrapper,
216                               ClientCallBehavior client_call_behavior,
217                               base::SequencedTaskRunner* current_task_runner);
218 
219   void MaybePostToProcessTasks(base::SequencedTaskRunner* task_runner);
220   void LockAndCallProcessTasks();
221 
222   // Updates the state of |endpoint|. If both the endpoint and its peer have
223   // been closed, removes it from |endpoints_|.
224   // NOTE: The method may invalidate |endpoint|.
225   enum EndpointStateUpdateType { ENDPOINT_CLOSED, PEER_ENDPOINT_CLOSED };
226   void UpdateEndpointStateMayRemove(InterfaceEndpoint* endpoint,
227                                     EndpointStateUpdateType type);
228 
229   void RaiseErrorInNonTestingMode();
230 
231   InterfaceEndpoint* FindOrInsertEndpoint(InterfaceId id, bool* inserted);
232   InterfaceEndpoint* FindEndpoint(InterfaceId id);
233 
234   // Returns false if some interface IDs are invalid or have been used.
235   bool InsertEndpointsForMessage(const Message& message);
236   void CloseEndpointsForMessage(const Message& message);
237 
238   void AssertLockAcquired();
239 
240   // Whether to set the namespace bit when generating interface IDs. Please see
241   // comments of kInterfaceIdNamespaceMask.
242   const bool set_interface_id_namespace_bit_;
243 
244   scoped_refptr<base::SequencedTaskRunner> task_runner_;
245 
246   // Owned by |filters_| below.
247   MessageHeaderValidator* header_validator_ = nullptr;
248 
249   FilterChain filters_;
250   Connector connector_;
251 
252   SEQUENCE_CHECKER(sequence_checker_);
253 
254   // Protects the following members.
255   // Not set in Config::SINGLE_INTERFACE* mode.
256   mutable base::Optional<base::Lock> lock_;
257   PipeControlMessageHandler control_message_handler_;
258 
259   // NOTE: It is unsafe to call into this object while holding |lock_|.
260   PipeControlMessageProxy control_message_proxy_;
261 
262   base::small_map<std::map<InterfaceId, scoped_refptr<InterfaceEndpoint>>, 1>
263       endpoints_;
264   uint32_t next_interface_id_value_ = 1;
265 
266   base::circular_deque<std::unique_ptr<Task>> tasks_;
267   // It refers to tasks in |tasks_| and doesn't own any of them.
268   std::map<InterfaceId, base::circular_deque<Task*>> sync_message_tasks_;
269 
270   bool posted_to_process_tasks_ = false;
271   scoped_refptr<base::SequencedTaskRunner> posted_to_task_runner_;
272 
273   bool encountered_error_ = false;
274 
275   bool paused_ = false;
276 
277   bool testing_mode_ = false;
278 
279   bool being_destructed_ = false;
280 
281   DISALLOW_COPY_AND_ASSIGN(MultiplexRouter);
282 };
283 
284 }  // namespace internal
285 }  // namespace mojo
286 
287 #endif  // MOJO_PUBLIC_CPP_BINDINGS_LIB_MULTIPLEX_ROUTER_H_
288