1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_CORE_DATA_PIPE_CONSUMER_DISPATCHER_H_
6 #define MOJO_CORE_DATA_PIPE_CONSUMER_DISPATCHER_H_
7 
8 #include <stddef.h>
9 #include <stdint.h>
10 
11 #include <memory>
12 
13 #include "base/macros.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/memory/shared_memory_mapping.h"
16 #include "base/memory/unsafe_shared_memory_region.h"
17 #include "base/synchronization/lock.h"
18 #include "mojo/core/dispatcher.h"
19 #include "mojo/core/ports/port_ref.h"
20 #include "mojo/core/system_impl_export.h"
21 #include "mojo/core/watcher_set.h"
22 
23 namespace mojo {
24 namespace core {
25 
26 class NodeController;
27 
28 // This is the Dispatcher implementation for the consumer handle for data
29 // pipes created by the Mojo primitive MojoCreateDataPipe(). This class is
30 // thread-safe.
31 class MOJO_SYSTEM_IMPL_EXPORT DataPipeConsumerDispatcher final
32     : public Dispatcher {
33  public:
34   static scoped_refptr<DataPipeConsumerDispatcher> Create(
35       NodeController* node_controller,
36       const ports::PortRef& control_port,
37       base::UnsafeSharedMemoryRegion shared_ring_buffer,
38       const MojoCreateDataPipeOptions& options,
39       uint64_t pipe_id);
40 
41   // Dispatcher:
42   Type GetType() const override;
43   MojoResult Close() override;
44   MojoResult ReadData(const MojoReadDataOptions& validated_options,
45                       void* elements,
46                       uint32_t* num_bytes) override;
47   MojoResult BeginReadData(const void** buffer,
48                            uint32_t* buffer_num_bytes) override;
49   MojoResult EndReadData(uint32_t num_bytes_read) override;
50   HandleSignalsState GetHandleSignalsState() const override;
51   MojoResult AddWatcherRef(const scoped_refptr<WatcherDispatcher>& watcher,
52                            uintptr_t context) override;
53   MojoResult RemoveWatcherRef(WatcherDispatcher* watcher,
54                               uintptr_t context) override;
55   void StartSerialize(uint32_t* num_bytes,
56                       uint32_t* num_ports,
57                       uint32_t* num_handles) override;
58   bool EndSerialize(void* destination,
59                     ports::PortName* ports,
60                     PlatformHandle* handles) override;
61   bool BeginTransit() override;
62   void CompleteTransitAndClose() override;
63   void CancelTransit() override;
64 
65   static scoped_refptr<DataPipeConsumerDispatcher> Deserialize(
66       const void* data,
67       size_t num_bytes,
68       const ports::PortName* ports,
69       size_t num_ports,
70       PlatformHandle* handles,
71       size_t num_handles);
72 
73  private:
74   class PortObserverThunk;
75   friend class PortObserverThunk;
76 
77   DataPipeConsumerDispatcher(NodeController* node_controller,
78                              const ports::PortRef& control_port,
79                              base::UnsafeSharedMemoryRegion shared_ring_buffer,
80                              const MojoCreateDataPipeOptions& options,
81                              uint64_t pipe_id);
82   ~DataPipeConsumerDispatcher() override;
83 
84   bool InitializeNoLock();
85   MojoResult CloseNoLock();
86   HandleSignalsState GetHandleSignalsStateNoLock() const;
87   void NotifyRead(uint32_t num_bytes);
88   void OnPortStatusChanged();
89   void UpdateSignalsStateNoLock();
90 
91   const MojoCreateDataPipeOptions options_;
92   NodeController* const node_controller_;
93   const ports::PortRef control_port_;
94   const uint64_t pipe_id_;
95 
96   // Guards access to the fields below.
97   mutable base::Lock lock_;
98 
99   WatcherSet watchers_;
100 
101   base::UnsafeSharedMemoryRegion shared_ring_buffer_;
102 
103   // We don't really write to it, and it's safe because we're the only consumer
104   // of this buffer.
105   base::WritableSharedMemoryMapping ring_buffer_mapping_;
106 
107   bool in_two_phase_read_ = false;
108   uint32_t two_phase_max_bytes_read_ = 0;
109 
110   bool in_transit_ = false;
111   bool is_closed_ = false;
112   bool peer_closed_ = false;
113   bool peer_remote_ = false;
114   bool transferred_ = false;
115 
116   uint32_t read_offset_ = 0;
117   uint32_t bytes_available_ = 0;
118 
119   // Indicates whether any new data is available since the last read attempt.
120   bool new_data_available_ = false;
121 
122   DISALLOW_COPY_AND_ASSIGN(DataPipeConsumerDispatcher);
123 };
124 
125 }  // namespace core
126 }  // namespace mojo
127 
128 #endif  // MOJO_CORE_DATA_PIPE_CONSUMER_DISPATCHER_H_
129