1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_CORE_MESSAGE_PIPE_DISPATCHER_H_
6 #define MOJO_CORE_MESSAGE_PIPE_DISPATCHER_H_
7 
8 #include <stdint.h>
9 
10 #include <memory>
11 #include <queue>
12 
13 #include "base/macros.h"
14 #include "base/optional.h"
15 #include "mojo/core/atomic_flag.h"
16 #include "mojo/core/dispatcher.h"
17 #include "mojo/core/ports/port_ref.h"
18 #include "mojo/core/watcher_set.h"
19 
20 namespace mojo {
21 namespace core {
22 
23 class NodeController;
24 
25 class MessagePipeDispatcher : public Dispatcher {
26  public:
27   // Constructs a MessagePipeDispatcher permanently tied to a specific port.
28   // |connected| must indicate the state of the port at construction time; if
29   // the port is initialized with a peer, |connected| must be true. Otherwise it
30   // must be false.
31   //
32   // A MessagePipeDispatcher may not be transferred while in a disconnected
33   // state, and one can never return to a disconnected once connected.
34   //
35   // |pipe_id| is a unique identifier which can be used to track pipe endpoints
36   // as they're passed around. |endpoint| is either 0 or 1 and again is only
37   // used for tracking pipes (one side is always 0, the other is always 1.)
38   MessagePipeDispatcher(NodeController* node_controller,
39                         const ports::PortRef& port,
40                         uint64_t pipe_id,
41                         int endpoint);
42 
43   // Fuses this pipe with |other|. Returns |true| on success or |false| on
44   // failure. Regardless of the return value, both dispatchers are closed by
45   // this call.
46   bool Fuse(MessagePipeDispatcher* other);
47 
48   // Dispatcher:
49   Type GetType() const override;
50   MojoResult Close() override;
51   MojoResult WriteMessage(
52       std::unique_ptr<ports::UserMessageEvent> message) override;
53   MojoResult ReadMessage(
54       std::unique_ptr<ports::UserMessageEvent>* message) override;
55   MojoResult SetQuota(MojoQuotaType type, uint64_t limit) override;
56   MojoResult QueryQuota(MojoQuotaType type,
57                         uint64_t* limit,
58                         uint64_t* usage) override;
59   HandleSignalsState GetHandleSignalsState() const override;
60   MojoResult AddWatcherRef(const scoped_refptr<WatcherDispatcher>& watcher,
61                            uintptr_t context) override;
62   MojoResult RemoveWatcherRef(WatcherDispatcher* watcher,
63                               uintptr_t context) override;
64   void StartSerialize(uint32_t* num_bytes,
65                       uint32_t* num_ports,
66                       uint32_t* num_handles) override;
67   bool EndSerialize(void* destination,
68                     ports::PortName* ports,
69                     PlatformHandle* handles) override;
70   bool BeginTransit() override;
71   void CompleteTransitAndClose() override;
72   void CancelTransit() override;
73 
74   static scoped_refptr<Dispatcher> Deserialize(const void* data,
75                                                size_t num_bytes,
76                                                const ports::PortName* ports,
77                                                size_t num_ports,
78                                                PlatformHandle* handles,
79                                                size_t num_handles);
80 
81  private:
82   class PortObserverThunk;
83   friend class PortObserverThunk;
84 
85   ~MessagePipeDispatcher() override;
86 
87   MojoResult CloseNoLock();
88   HandleSignalsState GetHandleSignalsStateNoLock() const;
89   void OnPortStatusChanged();
90 
91   // These are safe to access from any thread without locking.
92   NodeController* const node_controller_;
93   const ports::PortRef port_;
94   const uint64_t pipe_id_;
95   const int endpoint_;
96 
97   // Guards access to all the fields below.
98   mutable base::Lock signal_lock_;
99 
100   // This is not the same is |port_transferred_|. It's only held true between
101   // BeginTransit() and Complete/CancelTransit().
102   AtomicFlag in_transit_;
103 
104   bool port_transferred_ = false;
105   AtomicFlag port_closed_;
106   WatcherSet watchers_;
107   base::Optional<uint64_t> receive_queue_length_limit_;
108   base::Optional<uint64_t> receive_queue_memory_size_limit_;
109 
110   DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher);
111 };
112 
113 }  // namespace core
114 }  // namespace mojo
115 
116 #endif  // MOJO_CORE_MESSAGE_PIPE_DISPATCHER_H_
117