1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SINGLE_STATE_QUEUE_H
18 #define SINGLE_STATE_QUEUE_H
19 
20 // Non-blocking single element state queue, or
21 // Non-blocking single-reader / single-writer multi-word atomic load / store
22 
23 #include <stdint.h>
24 #include <cutils/atomic.h>
25 
26 namespace android {
27 
28 template<typename T> class SingleStateQueue {
29 
30 public:
31 
32     class Mutator;
33     class Observer;
34 
35     enum SSQ_STATUS {
36         SSQ_PENDING, /* = 0 */
37         SSQ_READ,
38         SSQ_DONE,
39     };
40 
41     struct Shared {
42         // needs to be part of a union so don't define constructor or destructor
43 
44         friend class Mutator;
45         friend class Observer;
46 
47 private:
initShared48         void                init() { mAck = 0; mSequence = 0; }
49 
50         volatile int32_t    mAck;
51         volatile int32_t    mSequence;
52         T                   mValue;
53     };
54 
55     class Mutator {
56     public:
Mutator(Shared * shared)57         Mutator(Shared *shared)
58             : mSequence(0), mShared(shared)
59         {
60             // exactly one of Mutator and Observer must initialize, currently it is Observer
61             // shared->init();
62         }
63 
64         // push new value onto state queue, overwriting previous value;
65         // returns a sequence number which can be used with ack()
push(const T & value)66         int32_t push(const T& value)
67         {
68             Shared *shared = mShared;
69             int32_t sequence = mSequence;
70             sequence++;
71             android_atomic_acquire_store(sequence, &shared->mSequence);
72             shared->mValue = value;
73             sequence++;
74             android_atomic_release_store(sequence, &shared->mSequence);
75             mSequence = sequence;
76             // consider signalling a futex here, if we know that observer is waiting
77             return sequence;
78         }
79 
80         // returns the status of the last state push.  This may be a stale value.
81         //
82         // SSQ_PENDING, or 0, means it has not been observed
83         // SSQ_READ means it has been read
84         // SSQ_DONE means it has been acted upon, after Observer::done() is called
ack()85         enum SSQ_STATUS ack() const
86         {
87             // in the case of SSQ_DONE, prevent any subtle data-races of subsequent reads
88             // being performed (out-of-order) before the ack read, should the caller be
89             // depending on sequentiality of reads.
90             const int32_t ack = android_atomic_acquire_load(&mShared->mAck);
91             return ack - mSequence & ~1 ? SSQ_PENDING /* seq differ */ :
92                     ack & 1 ? SSQ_DONE : SSQ_READ;
93         }
94 
95         // return true if a push with specified sequence number or later has been observed
ack(int32_t sequence)96         bool ack(int32_t sequence) const
97         {
98             // this relies on 2's complement rollover to detect an ancient sequence number
99             return mShared->mAck - sequence >= 0;
100         }
101 
102     private:
103         int32_t     mSequence;
104         Shared * const mShared;
105     };
106 
107     class Observer {
108     public:
Observer(Shared * shared)109         Observer(Shared *shared)
110             : mSequence(0), mSeed(1), mShared(shared)
111         {
112             // exactly one of Mutator and Observer must initialize, currently it is Observer
113             shared->init();
114         }
115 
116         // return true if value has changed
poll(T & value)117         bool poll(T& value)
118         {
119             Shared *shared = mShared;
120             int32_t before = shared->mSequence;
121             if (before == mSequence) {
122                 return false;
123             }
124             for (int tries = 0; ; ) {
125                 const int MAX_TRIES = 5;
126                 if (before & 1) {
127                     if (++tries >= MAX_TRIES) {
128                         return false;
129                     }
130                     before = shared->mSequence;
131                 } else {
132                     android_memory_barrier();
133                     T temp = shared->mValue;
134                     int32_t after = android_atomic_release_load(&shared->mSequence);
135                     if (after == before) {
136                         value = temp;
137                         shared->mAck = before;
138                         mSequence = before; // mSequence is even after poll success
139                         return true;
140                     }
141                     if (++tries >= MAX_TRIES) {
142                         return false;
143                     }
144                     before = after;
145                 }
146             }
147         }
148 
149         // (optional) used to indicate to the Mutator that the state that has been polled
150         // has also been acted upon.
done()151         void done()
152         {
153             const int32_t ack = mShared->mAck + 1;
154             // ensure all previous writes have been performed.
155             android_atomic_release_store(ack, &mShared->mAck); // mSequence is odd after "done"
156         }
157 
158     private:
159         int32_t     mSequence;
160         int         mSeed;  // for PRNG
161         Shared * const mShared;
162     };
163 
164 #if 0
165     SingleStateQueue(void /*Shared*/ *shared);
166     /*virtual*/ ~SingleStateQueue() { }
167 
168     static size_t size() { return sizeof(Shared); }
169 #endif
170 
171 };
172 
173 }   // namespace android
174 
175 #endif  // SINGLE_STATE_QUEUE_H
176