1 // Copyright (C) 2016 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #pragma once
15 
16 #include "aemu/base/Compiler.h"
17 #include "aemu/base/files/Stream.h"
18 #include "aemu/base/files/StreamSerializing.h"
19 #include "aemu/base/synchronization/ConditionVariable.h"
20 #include "aemu/base/synchronization/Lock.h"
21 
22 #include <iterator>
23 #include <vector>
24 #include <utility>
25 
26 #include <assert.h>
27 #include <stddef.h>
28 
29 namespace android {
30 namespace base {
31 
32 // Values corresponding to the result of BufferQueue operations.
33 // |Ok| means everything went well.
34 // |TryAgain| means the operation could not be performed and should be
35 // tried later.
36 // |Error| means an error happened (i.e. the BufferQueue is closed).
37 // |Timeout| means that an item could not be popped in time.
38 enum class BufferQueueResult {
39     Ok = 0,
40     TryAgain = 1,
41     Error = 2,
42     Timeout = 3,
43 };
44 
45 // BufferQueue models a FIFO queue of <T> instances
46 // that can be used between two different threads. Note that it depends,
47 // for synchronization, on an external lock (passed as a reference in
48 // the BufferQueue constructor).
49 //
50 // This allows one to use multiple BufferQueue instances whose content
51 // are protected by a single lock.
52 template <class T>
53 class BufferQueue {
54     using ConditionVariable = android::base::ConditionVariable;
55     using Lock = android::base::Lock;
56     using AutoLock = android::base::AutoLock;
57 
58 public:
59     using value_type = T;
60 
61     // Constructor. |capacity| is the maximum number of T instances in
62     // the queue, and |lock| is a reference to an external lock provided by
63     // the caller.
BufferQueue(int capacity,android::base::Lock & lock)64     BufferQueue(int capacity, android::base::Lock& lock)
65         : mBuffers(capacity), mLock(lock) {}
66 
67     // Return true iff one can send a buffer to the queue, i.e. if it
68     // is not full or it would grow anyway.
canPushLocked()69     bool canPushLocked() const { return !mClosed && mCount < (int)mBuffers.size(); }
70 
71     // A blocking call that will wait until one can send a buffer to the queue.
waitUntilPushableLocked()72     void waitUntilPushableLocked() {
73         if (!canPushLocked()) {
74             mCanPush.wait(&mLock);
75         }
76     }
77 
78     // Return true iff one can receive a buffer from the queue, i.e. if
79     // it is not empty.
canPopLocked()80     bool canPopLocked() const { return mCount > 0; }
81 
82     // A blocking call that will wait until one can receive a buffer from the queue.
waitUntilPopableLocked()83     void waitUntilPopableLocked() {
84         if (!canPopLocked()) {
85             mCanPop.wait(&mLock);
86         }
87     }
88 
89     // Return true iff the queue is closed.
isClosedLocked()90     bool isClosedLocked() const { return mClosed; }
91 
92     // Changes the operation mode to snapshot or back. In snapshot mode
93     // BufferQueue accepts all write requests and accumulates the data, but
94     // returns error on all reads.
setSnapshotModeLocked(bool on)95     void setSnapshotModeLocked(bool on) {
96         mSnapshotMode = on;
97         if (on && !mClosed) {
98             wakeAllWaiters();
99         }
100     }
101 
102     // Try to send a buffer to the queue. On success, return BufferQueueResult::Ok
103     // and moves |buffer| to the queue. On failure, return
104     // BufferQueueResult::TryAgain if the queue was full, or BufferQueueResult::Error
105     // if it was closed.
106     // Note: in snapshot mode it never returns TryAgain, but grows the max
107     //   queue size instead.
tryPushLocked(T && buffer)108     BufferQueueResult tryPushLocked(T&& buffer) {
109         if (mClosed) {
110             return BufferQueueResult::Error;
111         }
112         if (mCount >= (int)mBuffers.size()) {
113             if (mSnapshotMode) {
114                 grow();
115             } else {
116                 return BufferQueueResult::TryAgain;
117             }
118         }
119         int pos = mPos + mCount;
120         if (pos >= (int)mBuffers.size()) {
121             pos -= mBuffers.size();
122         }
123         mBuffers[pos] = std::move(buffer);
124         if (mCount++ == 0) {
125             mCanPop.signal();
126         }
127         return BufferQueueResult::Ok;
128     }
129 
130     // Push a buffer to the queue. This is a blocking call. On success,
131     // move |buffer| into the queue and return BufferQueueResult::Ok. On failure,
132     // return BufferQueueResult::Error meaning the queue was closed.
pushLocked(T && buffer)133     BufferQueueResult pushLocked(T&& buffer) {
134         while (mCount == (int)mBuffers.size() && !mSnapshotMode) {
135             if (mClosed) {
136                 return BufferQueueResult::Error;
137             }
138             mCanPush.wait(&mLock);
139         }
140         return tryPushLocked(std::move(buffer));
141     }
142 
143     // Try to read a buffer from the queue. On success, moves item into
144     // |*buffer| and return BufferQueueResult::Ok. On failure, return BufferQueueResult::Error
145     // if the queue is empty and closed or in snapshot mode, and
146     // BufferQueueResult::TryAgain if it is empty but not closed.
tryPopLocked(T * buffer)147     BufferQueueResult tryPopLocked(T* buffer) {
148         if (mCount == 0) {
149             return (mClosed || mSnapshotMode) ? BufferQueueResult::Error
150                                               : BufferQueueResult::TryAgain;
151         }
152         *buffer = std::move(mBuffers[mPos]);
153         int pos = mPos + 1;
154         if (pos >= (int)mBuffers.size()) {
155             pos -= mBuffers.size();
156         }
157         mPos = pos;
158         if (mCount-- == (int)mBuffers.size()) {
159             mCanPush.signal();
160         }
161         return BufferQueueResult::Ok;
162     }
163 
164     // Pop a buffer from the queue. This is a blocking call. On success,
165     // move item into |*buffer| and return BufferQueueResult::Ok. On failure,
166     // return BufferQueueResult::Error to indicate the queue was closed or is in
167     // snapshot mode.
popLocked(T * buffer)168     BufferQueueResult popLocked(T* buffer) {
169         while (mCount == 0 && !mSnapshotMode) {
170             if (mClosed) {
171                 // Closed queue is empty.
172                 return BufferQueueResult::Error;
173             }
174             mCanPop.wait(&mLock);
175         }
176         return tryPopLocked(buffer);
177     }
178 
179     // Pop a buffer from the queue. This is a blocking call. On success,
180     // move item into |*buffer| and return BufferQueueResult::Ok. On failure,
181     // return BufferQueueResult::Error to indicate the queue was closed or is in
182     // snapshot mode. Returns BufferQueueResult::Timeout if we waited passed
183     // waitUntilUs.
popLockedBefore(T * buffer,uint64_t waitUntilUs)184     BufferQueueResult popLockedBefore(T* buffer, uint64_t waitUntilUs) {
185         while (mCount == 0 && !mSnapshotMode) {
186             if (mClosed) {
187                 // Closed queue is empty.
188                 return BufferQueueResult::Error;
189             }
190             if (!mCanPop.timedWait(&mLock, waitUntilUs)) {
191                 return BufferQueueResult::Timeout;
192             }
193 
194         }
195         return tryPopLocked(buffer);
196     }
197 
198     // Close the queue, it is no longer possible to push new items
199     // to it (i.e. push() will always return BufferQueueResult::Error), or to
200     // read from an empty queue (i.e. pop() will always return
201     // BufferQueueResult::Error once the queue becomes empty).
closeLocked()202     void closeLocked() {
203         mClosed = true;
204         wakeAllWaiters();
205     }
206 
207     // Save to a snapshot file
onSaveLocked(android::base::Stream * stream)208     void onSaveLocked(android::base::Stream* stream) {
209         stream->putByte(mClosed);
210         if (!mClosed) {
211             stream->putBe32(mCount);
212             for (int i = 0; i < mCount; i++) {
213                 android::base::saveBuffer(
214                         stream, mBuffers[(i + mPos) % mBuffers.size()]);
215             }
216         }
217     }
218 
onLoadLocked(android::base::Stream * stream)219     bool onLoadLocked(android::base::Stream* stream) {
220         mClosed = stream->getByte();
221         if (!mClosed) {
222             mCount = stream->getBe32();
223             if ((int)mBuffers.size() < mCount) {
224                 mBuffers.resize(mCount);
225             }
226             mPos = 0;
227             for (int i = 0; i < mCount; i++) {
228                 if (!android::base::loadBuffer(stream, &mBuffers[i])) {
229                     return false;
230                 }
231             }
232         }
233         return true;
234     }
235 
236 private:
grow()237     void grow() {
238         assert(mCount == (int)mBuffers.size());
239         std::vector<T> newBuffers;
240         newBuffers.reserve(mBuffers.size() * 2);
241         newBuffers.insert(newBuffers.end(),
242                           std::make_move_iterator(mBuffers.begin() + mPos),
243                           std::make_move_iterator(
244                                   mBuffers.begin() +
245                                   std::min<int>(mPos + mCount, mBuffers.size())));
246         newBuffers.insert(
247                 newBuffers.end(), std::make_move_iterator(mBuffers.begin()),
248                 std::make_move_iterator(mBuffers.begin() +
249                                         (mPos + mCount) % mBuffers.size()));
250         mBuffers = std::move(newBuffers);
251         mBuffers.resize(mBuffers.capacity());
252         mPos = 0;
253     }
254 
wakeAllWaiters()255     void wakeAllWaiters() {
256         if (mCount == (int)mBuffers.size()) {
257             mCanPush.broadcast();
258         }
259         if (mCount == 0) {
260             mCanPop.broadcast();
261         }
262     }
263 
264 private:
265     int mPos = 0;
266     int mCount = 0;
267     bool mClosed = false;
268     bool mSnapshotMode = false;
269     std::vector<T> mBuffers;
270 
271     Lock& mLock;
272     ConditionVariable mCanPush;
273     ConditionVariable mCanPop;
274 
275     DISALLOW_COPY_ASSIGN_AND_MOVE(BufferQueue);
276 };
277 
278 }  // namespace base
279 }  // namespace android
280