1 // Copyright 2014 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #pragma once 16 17 #include "aemu/base/Optional.h" 18 #include "aemu/base/synchronization/AndroidConditionVariable.h" 19 #include "aemu/base/synchronization/AndroidLock.h" 20 21 #include <utility> 22 #include <stddef.h> 23 24 namespace gfxstream { 25 namespace guest { 26 27 // Base non-templated class used to reduce the amount of template 28 // specialization. 29 class MessageChannelBase { 30 public: 31 // Get the current channel size 32 size_t size() const; 33 34 // Abort the currently pending operations and don't allow any other ones 35 void stop(); 36 37 // Check if the channel is stopped. 38 bool isStopped() const; 39 40 // Block until the channel has no pending messages. 41 void waitForEmpty(); 42 43 protected: 44 // Constructor. |capacity| is the buffer capacity in messages. 45 MessageChannelBase(size_t capacity); 46 47 // Destructor. 48 ~MessageChannelBase() = default; 49 50 // Call this method in the sender thread before writing a new message. 51 // This returns the position of the available slot in the message array 52 // where to copy the new fixed-size message. After the copy, call 53 // afterWrite(). 54 // If the channel is stopped, return value is undefined. 55 size_t beforeWrite(); 56 57 // Same as beforeWrite(), but returns an empty optional if there was 58 // no room to write to instead of waiting for it. 59 // One still needs to call afterWrite() anyway. 60 Optional<size_t> beforeTryWrite(); 61 62 // To be called after trying to write a new fixed-size message (which should 63 // happen after beforeWrite() or beforeTryWrite()). 64 // |success| must be true to indicate that a new item was added to the 65 // channel, or false otherwise (i.e. if the channel is stopped, or if 66 // beforeTryWrite() returned an empty optional). 67 void afterWrite(bool success); 68 69 // Call this method in the receiver thread before reading a new message. 70 // This returns the position in the message array where the new message 71 // can be read. Caller must process the message, then call afterRead(). 72 // If the channel is stopped, return value is undefined. 73 size_t beforeRead(); 74 75 // Same as beforeRead(), but returns an empty optional if there was 76 // no data to read instead of waiting for it. 77 // One still needs to call afterWrite() anyway. 78 Optional<size_t> beforeTryRead(); 79 80 // Same as beforeRead(), but returns an empty optional if no data arrived 81 // by the |wallTimeUs| absolute time. One still needs to call 82 // afterWrite() anyway. 83 Optional<size_t> beforeTimedRead(uint64_t wallTimeUs); 84 85 // To be called after reading a fixed-size message from the channel (which 86 // must happen after beforeRead() or beforeTryRead()). 87 // |success| must be true to indicate that a message was read, or false 88 // otherwise (i.e. if the channel is stopped or if beforeTryRead() returned 89 // an empty optional). 90 void afterRead(bool success); 91 92 // A version of isStopped() that doesn't lock the channel but expects it 93 // to be locked by the caller. isStoppedLocked()94 bool isStoppedLocked() const { return mStopped; } 95 96 private: 97 size_t mPos = 0; 98 size_t mCapacity; 99 size_t mCount = 0; 100 bool mStopped = false; 101 mutable Lock mLock; // Mutable to allow const members to lock it. 102 ConditionVariable mCanRead; 103 ConditionVariable mCanWrite; 104 }; 105 106 // Helper class used to implement an uni-directional IPC channel between 107 // two threads. The channel can be used to send fixed-size messages of type 108 // |T|, with an internal buffer size of |CAPACITY| items. All calls are 109 // blocking. 110 // 111 // Usage is pretty straightforward: 112 // 113 // - From the sender thread, call send(msg); 114 // - From the receiver thread, call receive(&msg); 115 // - If you want to stop the IPC, call stop(); 116 template <typename T, size_t CAPACITY> 117 class MessageChannel : public MessageChannelBase { 118 public: MessageChannel()119 MessageChannel() : MessageChannelBase(CAPACITY) {} 120 send(const T & msg)121 bool send(const T& msg) { 122 const size_t pos = beforeWrite(); 123 const bool res = !isStoppedLocked(); 124 if (res) { 125 mItems[pos] = msg; 126 } 127 afterWrite(res); 128 return res; 129 } 130 send(T && msg)131 bool send(T&& msg) { 132 const size_t pos = beforeWrite(); 133 const bool res = !isStoppedLocked(); 134 if (res) { 135 mItems[pos] = std::move(msg); 136 } 137 afterWrite(res); 138 return res; 139 } 140 trySend(const T & msg)141 bool trySend(const T& msg) { 142 const auto pos = beforeTryWrite(); 143 if (pos) { 144 mItems[*pos] = msg; 145 } 146 afterWrite(pos); 147 return pos; 148 } 149 trySend(T && msg)150 bool trySend(T&& msg) { 151 const auto pos = beforeTryWrite(); 152 if (pos) { 153 mItems[*pos] = std::move(msg); 154 } 155 afterWrite(pos); 156 return pos; 157 } 158 receive(T * msg)159 bool receive(T* msg) { 160 const size_t pos = beforeRead(); 161 const bool res = !isStoppedLocked(); 162 if (res) { 163 *msg = std::move(mItems[pos]); 164 } 165 afterRead(res); 166 return res; 167 } 168 receive()169 Optional<T> receive() { 170 const size_t pos = beforeRead(); 171 if (!isStoppedLocked()) { 172 Optional<T> msg(std::move(mItems[pos])); 173 afterRead(true); 174 return msg; 175 } else { 176 afterRead(false); 177 return {}; 178 } 179 } 180 tryReceive(T * msg)181 bool tryReceive(T* msg) { 182 const auto pos = beforeTryRead(); 183 if (pos) { 184 *msg = std::move(mItems[*pos]); 185 } 186 afterRead(pos); 187 return pos; 188 } 189 timedReceive(uint64_t wallTimeUs)190 Optional<T> timedReceive(uint64_t wallTimeUs) { 191 const auto pos = beforeTimedRead(wallTimeUs); 192 if (pos && !isStoppedLocked()) { 193 Optional<T> res(std::move(mItems[*pos])); 194 afterRead(true); 195 return res; 196 } 197 afterRead(false); 198 return {}; 199 } 200 capacity()201 constexpr size_t capacity() const { return CAPACITY; } 202 203 private: 204 T mItems[CAPACITY]; 205 }; 206 207 } // namespace guest 208 } // namespace gfxstream 209