1 // Copyright 2014 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include "aemu/base/Optional.h"
18 #include "aemu/base/synchronization/AndroidConditionVariable.h"
19 #include "aemu/base/synchronization/AndroidLock.h"
20 
21 #include <utility>
22 #include <stddef.h>
23 
24 namespace gfxstream {
25 namespace guest {
26 
27 // Base non-templated class used to reduce the amount of template
28 // specialization.
29 class MessageChannelBase {
30 public:
31     // Get the current channel size
32     size_t size() const;
33 
34     // Abort the currently pending operations and don't allow any other ones
35     void stop();
36 
37     // Check if the channel is stopped.
38     bool isStopped() const;
39 
40     // Block until the channel has no pending messages.
41     void waitForEmpty();
42 
43 protected:
44     // Constructor. |capacity| is the buffer capacity in messages.
45     MessageChannelBase(size_t capacity);
46 
47     // Destructor.
48     ~MessageChannelBase() = default;
49 
50     // Call this method in the sender thread before writing a new message.
51     // This returns the position of the available slot in the message array
52     // where to copy the new fixed-size message. After the copy, call
53     // afterWrite().
54     // If the channel is stopped, return value is undefined.
55     size_t beforeWrite();
56 
57     // Same as beforeWrite(), but returns an empty optional if there was
58     // no room to write to instead of waiting for it.
59     // One still needs to call afterWrite() anyway.
60     Optional<size_t> beforeTryWrite();
61 
62     // To be called after trying to write a new fixed-size message (which should
63     // happen after beforeWrite() or beforeTryWrite()).
64     // |success| must be true to indicate that a new item was added to the
65     // channel, or false otherwise (i.e. if the channel is stopped, or if
66     // beforeTryWrite() returned an empty optional).
67     void afterWrite(bool success);
68 
69     // Call this method in the receiver thread before reading a new message.
70     // This returns the position in the message array where the new message
71     // can be read. Caller must process the message, then call afterRead().
72     // If the channel is stopped, return value is undefined.
73     size_t beforeRead();
74 
75     // Same as beforeRead(), but returns an empty optional if there was
76     // no data to read instead of waiting for it.
77     // One still needs to call afterWrite() anyway.
78     Optional<size_t> beforeTryRead();
79 
80     // Same as beforeRead(), but returns an empty optional if no data arrived
81     // by the |wallTimeUs| absolute time. One still needs to call
82     // afterWrite() anyway.
83     Optional<size_t> beforeTimedRead(uint64_t wallTimeUs);
84 
85     // To be called after reading a fixed-size message from the channel (which
86     // must happen after beforeRead() or beforeTryRead()).
87     // |success| must be true to indicate that a message was read, or false
88     // otherwise (i.e. if the channel is stopped or if beforeTryRead() returned
89     // an empty optional).
90     void afterRead(bool success);
91 
92     // A version of isStopped() that doesn't lock the channel but expects it
93     // to be locked by the caller.
isStoppedLocked()94     bool isStoppedLocked() const { return mStopped; }
95 
96 private:
97     size_t mPos = 0;
98     size_t mCapacity;
99     size_t mCount = 0;
100     bool mStopped = false;
101     mutable Lock mLock;     // Mutable to allow const members to lock it.
102     ConditionVariable mCanRead;
103     ConditionVariable mCanWrite;
104 };
105 
106 // Helper class used to implement an uni-directional IPC channel between
107 // two threads. The channel can be used to send fixed-size messages of type
108 // |T|, with an internal buffer size of |CAPACITY| items. All calls are
109 // blocking.
110 //
111 // Usage is pretty straightforward:
112 //
113 //   - From the sender thread, call send(msg);
114 //   - From the receiver thread, call receive(&msg);
115 //   - If you want to stop the IPC, call stop();
116 template <typename T, size_t CAPACITY>
117 class MessageChannel : public MessageChannelBase {
118 public:
MessageChannel()119     MessageChannel() : MessageChannelBase(CAPACITY) {}
120 
send(const T & msg)121     bool send(const T& msg) {
122         const size_t pos = beforeWrite();
123         const bool res = !isStoppedLocked();
124         if (res) {
125             mItems[pos] = msg;
126         }
127         afterWrite(res);
128         return res;
129     }
130 
send(T && msg)131     bool send(T&& msg) {
132         const size_t pos = beforeWrite();
133         const bool res = !isStoppedLocked();
134         if (res) {
135             mItems[pos] = std::move(msg);
136         }
137         afterWrite(res);
138         return res;
139     }
140 
trySend(const T & msg)141     bool trySend(const T& msg) {
142         const auto pos = beforeTryWrite();
143         if (pos) {
144             mItems[*pos] = msg;
145         }
146         afterWrite(pos);
147         return pos;
148     }
149 
trySend(T && msg)150     bool trySend(T&& msg) {
151         const auto pos = beforeTryWrite();
152         if (pos) {
153             mItems[*pos] = std::move(msg);
154         }
155         afterWrite(pos);
156         return pos;
157     }
158 
receive(T * msg)159     bool receive(T* msg) {
160         const size_t pos = beforeRead();
161         const bool res = !isStoppedLocked();
162         if (res) {
163             *msg = std::move(mItems[pos]);
164         }
165         afterRead(res);
166         return res;
167     }
168 
receive()169     Optional<T> receive() {
170         const size_t pos = beforeRead();
171         if (!isStoppedLocked()) {
172             Optional<T> msg(std::move(mItems[pos]));
173             afterRead(true);
174             return msg;
175         } else {
176             afterRead(false);
177             return {};
178         }
179     }
180 
tryReceive(T * msg)181     bool tryReceive(T* msg) {
182         const auto pos = beforeTryRead();
183         if (pos) {
184             *msg = std::move(mItems[*pos]);
185         }
186         afterRead(pos);
187         return pos;
188     }
189 
timedReceive(uint64_t wallTimeUs)190     Optional<T> timedReceive(uint64_t wallTimeUs) {
191         const auto pos = beforeTimedRead(wallTimeUs);
192         if (pos && !isStoppedLocked()) {
193             Optional<T> res(std::move(mItems[*pos]));
194             afterRead(true);
195             return res;
196         }
197         afterRead(false);
198         return {};
199     }
200 
capacity()201     constexpr size_t capacity() const { return CAPACITY; }
202 
203 private:
204     T mItems[CAPACITY];
205 };
206 
207 } // namespace guest
208 } // namespace gfxstream
209