1 // Copyright 2014 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "aemu/base/synchronization/MessageChannel.h"
16 
17 namespace android {
18 namespace base {
19 
MessageChannelBase(size_t capacity)20 MessageChannelBase::MessageChannelBase(size_t capacity) : mCapacity(capacity) {}
21 
size() const22 size_t MessageChannelBase::size() const {
23     AutoLock lock(mLock);
24     return mCount;
25 }
26 
stop()27 void MessageChannelBase::stop() {
28     android::base::AutoLock lock(mLock);
29     mStopped = true;
30     mCount = 0;
31     mCanRead.broadcast();
32     mCanWrite.broadcastAndUnlock(&lock);
33 }
34 
isStopped() const35 bool MessageChannelBase::isStopped() const {
36     AutoLock lock(mLock);
37     return isStoppedLocked();
38 }
39 
waitForEmpty()40 void MessageChannelBase::waitForEmpty() {
41     AutoLock lock(mLock);
42     while (mCount > 0) {
43         mCanWrite.wait(&lock);
44     }
45 }
46 
beforeWrite()47 size_t MessageChannelBase::beforeWrite() {
48     mLock.lock();
49     while (mCount >= mCapacity && !mStopped) {
50         mCanWrite.wait(&mLock);
51     }
52     // Return value is undefined if stopped, so let's save a branch and skip the
53     // check for it.
54     size_t result = mPos + mCount;
55     if (result >= mCapacity) {
56         result -= mCapacity;
57     }
58     return result;
59 }
60 
beforeTryWrite()61 Optional<size_t> MessageChannelBase::beforeTryWrite() {
62     mLock.lock();
63 
64     if (mCount >= mCapacity || mStopped) {
65         return {};
66     }
67     size_t result = mPos + mCount;
68     if (result >= mCapacity) {
69         result -= mCapacity;
70     }
71     return result;
72 }
73 
afterWrite(bool success)74 void MessageChannelBase::afterWrite(bool success) {
75     if (success) {
76         ++mCount;
77     }
78     mCanRead.signalAndUnlock(&mLock);
79 }
80 
beforeRead()81 size_t MessageChannelBase::beforeRead() {
82     mLock.lock();
83     while (mCount == 0 && !mStopped) {
84         mCanRead.wait(&mLock);
85     }
86     return mPos; // return value is undefined if stopped, so let's save a branch
87 }
88 
beforeTryRead()89 Optional<size_t> MessageChannelBase::beforeTryRead() {
90     mLock.lock();
91 
92     if (mCount == 0 || mStopped) {
93         return {};
94     }
95     return mPos;
96 }
97 
beforeTimedRead(uint64_t wallTimeUs)98 Optional<size_t> MessageChannelBase::beforeTimedRead(
99         uint64_t wallTimeUs) {
100     mLock.lock();
101 
102     while (mCount == 0 && !mStopped) {
103         if (!mCanRead.timedWait(&mLock, wallTimeUs)) {
104             return {};
105         }
106     }
107     return mPos;
108 }
109 
afterRead(bool success)110 void MessageChannelBase::afterRead(bool success) {
111     if (success) {
112         if (++mPos == mCapacity) {
113             mPos = 0U;
114         }
115         --mCount;
116     }
117     mCanWrite.signalAndUnlock(&mLock);
118 }
119 
120 }  // namespace base
121 }  // namespace android
122