1 /*
2  * Copyright 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 #include <atomic>
19 #include <optional>
20 
21 template <typename T>
22 // Single consumer multi producer stack. We can understand the two operations independently to see
23 // why they are without race condition.
24 //
25 // push is responsible for maintaining a linked list stored in mPush, and called from multiple
26 // threads without lock. We can see that if two threads never observe the same value from
27 // mPush.load, it just functions as a normal linked list. In the case where two threads observe the
28 // same value, one of them has to execute the compare_exchange first. The one that doesn't execute
29 // the compare exchange first, will receive false from compare_exchange. previousHead is updated (by
30 // compare_exchange) to the most recent value of mPush, and we try again. It's relatively clear to
31 // see that the process can repeat with an arbitrary number of threads.
32 //
33 // Pop is much simpler. If mPop is empty (as it begins) it atomically exchanges
34 // the entire push list with null. This is safe, since the only other reader (push)
35 // of mPush will retry if it changes in between it's read and atomic compare. We
36 // then store the list and pop one element.
37 //
38 // If we already had something in the pop list we just pop directly.
39 class LocklessQueue {
40 public:
41     class Entry {
42     public:
43         T mValue;
44         std::atomic<Entry*> mNext;
Entry(T value)45         Entry(T value) : mValue(value) {}
46     };
47     std::atomic<Entry*> mPush = nullptr;
48     std::atomic<Entry*> mPop = nullptr;
isEmpty()49     bool isEmpty() { return (mPush.load() == nullptr) && (mPop.load() == nullptr); }
50 
push(T value)51     void push(T value) {
52         Entry* entry = new Entry(value);
53         Entry* previousHead = mPush.load(/*std::memory_order_relaxed*/);
54         do {
55             entry->mNext = previousHead;
56         } while (!mPush.compare_exchange_weak(previousHead, entry)); /*std::memory_order_release*/
57     }
pop()58     std::optional<T> pop() {
59         Entry* popped = mPop.load(/*std::memory_order_acquire*/);
60         if (popped) {
61             // Single consumer so this is fine
62             mPop.store(popped->mNext /* , std::memory_order_release */);
63             auto value = popped->mValue;
64             delete popped;
65             return std::move(value);
66         } else {
67             Entry* grabbedList = mPush.exchange(nullptr /* , std::memory_order_acquire */);
68             if (!grabbedList) return std::nullopt;
69             // Reverse the list
70             while (grabbedList->mNext) {
71                 Entry* next = grabbedList->mNext;
72                 grabbedList->mNext = popped;
73                 popped = grabbedList;
74                 grabbedList = next;
75             }
76             mPop.store(popped /* , std::memory_order_release */);
77             auto value = grabbedList->mValue;
78             delete grabbedList;
79             return std::move(value);
80         }
81     }
82 };
83