1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include "Stream.h"
20 
21 #include <condition_variable>
22 #include <future>
23 #include <list>
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <unordered_set>
28 #include <vector>
29 
30 #include <utils/AndroidThreads.h>
31 
32 namespace android::soundpool {
33 
34 // TODO: Move helper classes to a utility file, with separate test.
35 
36 /**
37  * JavaThread is used like std::thread but for threads that may call the JVM.
38  *
39  * std::thread does not easily attach to the JVM.  We need JVM capable threads
40  * from createThreadEtc() since android binder call optimization may attempt to
41  * call back into Java if the SoundPool runs in system server.
42  *
43  *
44  * No locking is required - the member variables are inherently thread-safe.
45  */
46 class JavaThread {
47 public:
JavaThread(std::function<void ()> f,const char * name)48     JavaThread(std::function<void()> f, const char *name)
49         : mF{std::move(f)} {
50         createThreadEtc(staticFunction, this, name);
51     }
52 
53     JavaThread(JavaThread &&) = delete; // uses "this" ptr, not moveable.
54 
~JavaThread()55     ~JavaThread() {
56         join(); // manually block until the future is ready as std::future
57                 // destructor doesn't block unless it comes from std::async
58                 // and it is the last reference to shared state.
59     }
60 
join()61     void join() const {
62         mFuture.wait();
63     }
64 
isClosed()65     bool isClosed() const {
66         return mIsClosed;
67     }
68 
69 private:
staticFunction(void * data)70     static int staticFunction(void *data) {
71         JavaThread *jt = static_cast<JavaThread *>(data);
72         jt->mF();
73         jt->mIsClosed = true;  // set the flag that we are closed
74                                // now before we allow the destructor to execute;
75                                // otherwise there may be a use after free.
76         jt->mPromise.set_value();
77         return 0;
78     }
79 
80     // No locking is provided as these variables are initialized in the constructor
81     // and the members referenced are thread-safe objects.
82     // (mFuture.wait() can block multiple threads.)
83     // Note the order of member variables is reversed for destructor.
84     const std::function<void()> mF;
85     // Used in join() to block until the thread completes.
86     // See https://en.cppreference.com/w/cpp/thread/promise for the void specialization of
87     // promise.
88     std::promise<void>          mPromise;
89     std::future<void>           mFuture{mPromise.get_future()};
90     std::atomic_bool            mIsClosed = false;
91 };
92 
93 /**
94  * The ThreadPool manages thread lifetimes of SoundPool worker threads.
95  *
96  * TODO: the (eventual) goal of ThreadPool is to transparently and cooperatively
97  * maximize CPU utilization while avoiding starvation of other applications.
98  * Some possibilities:
99  *
100  * We should create worker threads when we have SoundPool work and the system is idle.
101  * CPU cycles are "use-it-or-lose-it" when the system is idle.
102  *
103  * We should adjust the priority of worker threads so that the second (and subsequent) worker
104  * threads have lower priority (should we try to promote priority also?).
105  *
106  * We should throttle the spawning of new worker threads, spacing over time, to avoid
107  * creating too many new threads all at once, on initialization.
108  */
109 class ThreadPool {
110 public:
ThreadPool(size_t maxThreadCount,std::string name)111     ThreadPool(size_t maxThreadCount, std::string name)
112         : mMaxThreadCount(maxThreadCount)
113         , mName{std::move(name)} { }
114 
~ThreadPool()115     ~ThreadPool() { quit(); }
116 
getActiveThreadCount()117     size_t getActiveThreadCount() const { return mActiveThreadCount; }
getMaxThreadCount()118     size_t getMaxThreadCount() const { return mMaxThreadCount; }
119 
quit()120     void quit() {
121         std::list<std::unique_ptr<JavaThread>> threads;
122         {
123             std::lock_guard lock(mThreadLock);
124             if (mQuit) return;  // already joined.
125             mQuit = true;
126             threads = std::move(mThreads);
127             mThreads.clear();
128         }
129         // mQuit set under lock, no more threads will be created.
130         for (auto &thread : threads) {
131             thread->join();
132             thread.reset();
133         }
134         LOG_ALWAYS_FATAL_IF(mActiveThreadCount != 0,
135                 "Invalid Active Threads: %zu", (size_t)mActiveThreadCount);
136     }
137 
138     // returns a non-zero id if successful, the id is to help logging messages.
launch(std::function<void (int32_t)> f)139     int32_t launch(std::function<void(int32_t /* id */)> f) {
140         std::list<std::unique_ptr<JavaThread>> threadsToRelease; // release outside of lock.
141         std::lock_guard lock(mThreadLock);
142         if (mQuit) return 0;  // ignore if we have quit
143 
144         // clean up threads.
145         for (auto it = mThreads.begin(); it != mThreads.end(); ) {
146             if ((*it)->isClosed()) {
147                 threadsToRelease.emplace_back(std::move(*it));
148                it = mThreads.erase(it);
149             } else {
150                ++it;
151             }
152         }
153 
154         const size_t threadCount = mThreads.size();
155         if (threadCount < mMaxThreadCount) {
156             // if the id wraps, we don't care about collisions.  it's just for logging.
157             mNextThreadId = mNextThreadId == INT32_MAX ? 1 : ++mNextThreadId;
158             const int32_t id = mNextThreadId;
159             mThreads.emplace_back(std::make_unique<JavaThread>(
160                     [this, id, mf = std::move(f)] { mf(id); --mActiveThreadCount; },
161                     (mName + std::to_string(id)).c_str()));
162             ++mActiveThreadCount;
163             return id;
164         }
165         return 0;
166     }
167 
168     // TODO: launch only if load average is low.
169     // This gets the load average
170     // See also std::thread::hardware_concurrency() for the concurrent capability.
getLoadAvg()171     static double getLoadAvg() {
172         double loadAvg[1];
173         if (getloadavg(loadAvg, std::size(loadAvg)) > 0) {
174             return loadAvg[0];
175         }
176         return -1.;
177     }
178 
179 private:
180     const size_t            mMaxThreadCount;
181     const std::string       mName;
182 
183     std::atomic_size_t      mActiveThreadCount = 0;
184 
185     std::mutex              mThreadLock;
186     bool                    mQuit GUARDED_BY(mThreadLock) = false;
187     int32_t                 mNextThreadId GUARDED_BY(mThreadLock) = 0;
188     std::list<std::unique_ptr<JavaThread>> mThreads GUARDED_BY(mThreadLock);
189 };
190 
191 /**
192  * A Perfect HashTable for IDs (key) to pointers (value).
193  *
194  * There are no collisions.  Why? because we generate the IDs for you to look up :-).
195  *
196  * The goal of this hash table is to map an integer ID handle > 0 to a pointer.
197  * We give these IDs in monotonic order (though we may skip if it were to cause a collision).
198  *
199  * The size of the hashtable must be large enough to accommodate the max number of keys.
200  * We suggest 2x.
201  *
202  * Readers are lockless
203  * Single writer could be lockless, but we allow multiple writers through an internal lock.
204  *
205  * For the Key type K, valid keys generated are > 0 (signed or unsigned)
206  * For the Value type V, values are pointers - nullptr means empty.
207  */
208 template <typename K, typename V>
209 class PerfectHash {
210 public:
PerfectHash(size_t hashCapacity)211     PerfectHash(size_t hashCapacity)
212         : mHashCapacity(hashCapacity)
213         , mK2V{new std::atomic<V>[hashCapacity]()} {
214     }
215 
216     // Generate a key for a value V.
217     // There is a testing function getKforV() which checks what the value reports as its key.
218     //
219     // Calls back into getKforV under lock.
220     //
221     // We expect that the hashCapacity is 2x the number of stored keys in order
222     // to have one or two tries to find an empty slot
223     K generateKey(V value, std::function<K(V)> getKforV, K oldKey = 0) {
224         std::lock_guard lock(mHashLock);
225         // try to remove the old key.
226         if (oldKey > 0) {  // key valid
227             const V v = getValue(oldKey);
228             if (v != nullptr) {  // value still valid
229                 const K atPosition = getKforV(v);
230                 if (atPosition < 0 ||            // invalid value
231                         atPosition == oldKey ||  // value's key still valid and matches old key
232                         ((atPosition ^ oldKey) & (mHashCapacity - 1)) != 0) { // stale key entry
233                     getValue(oldKey) = nullptr;  // invalidate
234                 }
235             } // else if value is invalid, no need to invalidate.
236         }
237         // check if we are invalidating only.
238         if (value == nullptr) return 0;
239         // now insert the new value and return the key.
240         size_t tries = 0;
241         for (; tries < mHashCapacity; ++tries) {
242             mNextKey = mNextKey == std::numeric_limits<K>::max() ? 1 : mNextKey + 1;
243             const V v = getValue(mNextKey);
244             //ALOGD("tries: %zu, key:%d value:%p", tries, (int)mNextKey, v);
245             if (v == nullptr) break; // empty
246             const K atPosition = getKforV(v);
247             //ALOGD("tries: %zu  key atPosition:%d", tries, (int)atPosition);
248             if (atPosition < 0 || // invalid value
249                     ((atPosition ^ mNextKey) & (mHashCapacity - 1)) != 0) { // stale key entry
250                 break;
251            }
252         }
253         LOG_ALWAYS_FATAL_IF(tries == mHashCapacity, "hash table overflow!");
254         //ALOGD("%s: found after %zu tries", __func__, tries);
255         getValue(mNextKey) = value;
256         return mNextKey;
257     }
258 
getValue(K key)259     std::atomic<V> &getValue(K key) { return mK2V[key & (mHashCapacity - 1)]; }
getValue(K key)260     const std::atomic_int32_t &getValue(K key) const { return mK2V[key & (mHashCapacity - 1)]; }
261 
262 private:
263     mutable std::mutex          mHashLock;
264     const size_t                mHashCapacity; // size of mK2V no lock needed.
265     std::unique_ptr<std::atomic<V>[]> mK2V;    // no lock needed for read access.
GUARDED_BY(mHashLock)266     K                           mNextKey GUARDED_BY(mHashLock) {};
267 };
268 
269 /**
270  * StreamMap contains the all the valid streams available to SoundPool.
271  *
272  * There is no Lock required for this class because the streams are
273  * allocated in the constructor, the lookup is lockless, and the Streams
274  * returned are locked internally.
275  *
276  * The lookup uses a perfect hash.
277  * It is possible to use a lockless hash table or to use a stripe-locked concurrent
278  * hashmap for essentially lock-free lookup.
279  *
280  * This follows Map-Reduce parallelism model.
281  * https://en.wikipedia.org/wiki/MapReduce
282  *
283  * Conceivably the forEach could be parallelized using std::for_each with a
284  * std::execution::par policy.
285  *
286  * https://en.cppreference.com/w/cpp/algorithm/for_each
287  */
288 class StreamMap {
289 public:
290     explicit StreamMap(int32_t streams);
291 
292     // Returns the stream associated with streamID or nullptr if not found.
293     // This need not be locked.
294     // The stream ID will never migrate to another Stream, but it may change
295     // underneath you.  The Stream operations that take a streamID will confirm
296     // that the streamID matches under the Stream lock before executing otherwise
297     // it ignores the command as stale.
298     Stream* findStream(int32_t streamID) const;
299 
300     // Iterates through the stream pool applying the function f.
301     // Since this enumerates over every single stream, it is unlocked.
302     //
303     // See related: https://en.cppreference.com/w/cpp/algorithm/for_each
forEach(std::function<void (const Stream *)> f)304     void forEach(std::function<void(const Stream *)>f) const {
305         for (size_t i = 0; i < mStreamPoolSize; ++i) {
306             f(&mStreamPool[i]);
307         }
308     }
309 
forEach(std::function<void (Stream *)> f)310     void forEach(std::function<void(Stream *)>f) {
311         for (size_t i = 0; i < mStreamPoolSize; ++i) {
312             f(&mStreamPool[i]);
313         }
314     }
315 
316     // Returns the pair stream for a given Stream.
317     // This need not be locked as it is a property of the pointer address.
getPairStream(const Stream * stream)318     Stream* getPairStream(const Stream* stream) const {
319         const size_t index = streamPosition(stream);
320         return &mStreamPool[index ^ 1];
321     }
322 
323     // find the position of the stream in mStreamPool array.
324     size_t streamPosition(const Stream* stream) const; // no lock needed
325 
getStreamMapSize()326     size_t getStreamMapSize() const {
327         return mStreamPoolSize;
328     }
329 
330     // find the next valid ID for a stream and store in hash table.
331     int32_t getNextIdForStream(Stream* stream) const;
332 
333 private:
334 
335     // use the hash table to attempt to find the stream.
336     // nullptr is returned if the lookup fails.
337     Stream* lookupStreamFromId(int32_t streamID) const;
338 
339     // The stream pool is initialized in the constructor, effectively const.
340     // no locking required for access.
341     //
342     // The constructor parameter "streams" results in streams pairs of streams.
343     // We have twice as many streams because we wish to return a streamID "handle"
344     // back to the app immediately, while we may be stopping the other stream in the
345     // pair to get its AudioTrack :-).
346     //
347     // Of the stream pair, only one of the streams may have an AudioTrack.
348     // The fixed association of a stream pair allows callbacks from the AudioTrack
349     // to be associated properly to either one or the other of the stream pair.
350     //
351     // TODO: The stream pair arrangement can be removed if we have better AudioTrack
352     // callback handling (being able to remove and change the callback after construction).
353     //
354     // Streams may be accessed anytime off of the stream pool
355     // as there is internal locking on each stream.
356     std::unique_ptr<Stream[]>   mStreamPool;        // no lock needed for access.
357     size_t                      mStreamPoolSize;    // no lock needed for access.
358 
359     // In order to find the Stream from a StreamID, we could do a linear lookup in mStreamPool.
360     // As an alternative, one could use stripe-locked or lock-free concurrent hashtables.
361     //
362     // When considering linear search vs hashmap, verify the typical use-case size.
363     // Linear search is faster than std::unordered_map (circa 2018) for less than 40 elements.
364     // [ Skarupke, M. (2018), "You Can Do Better than std::unordered_map: New and Recent
365     // Improvements to Hash Table Performance." C++Now 2018. cppnow.org, see
366     // https://www.youtube.com/watch?v=M2fKMP47slQ ]
367     //
368     // Here, we use a PerfectHash of Id to Stream *, since we can control the
369     // StreamID returned to the user.  This allows O(1) read access to mStreamPool lock-free.
370     //
371     // We prefer that the next stream ID is monotonic for aesthetic reasons
372     // (if we didn't care about monotonicity, a simple method is to apply a generation count
373     // to each stream in the unused upper bits of its index in mStreamPool for the id).
374     //
375     std::unique_ptr<PerfectHash<int32_t, Stream *>> mPerfectHash;
376 };
377 
378 /**
379  * StreamManager is used to manage the streams (accessed by StreamID from Java).
380  *
381  * Locking order (proceeds from application to component).
382  *  SoundPool mApiLock (if needed) -> StreamManager mStreamManagerLock
383  *                                 -> pair Stream mLock -> queued Stream mLock
384  */
385 class StreamManager : public StreamMap {
386 public:
387     // Note: the SoundPool pointer is only used for stream initialization.
388     // It is not stored in StreamManager.
389     StreamManager(int32_t streams, size_t threads, const audio_attributes_t* attributes);
390     ~StreamManager();
391 
392     // Returns positive streamID on success, 0 on failure.  This is locked.
393     int32_t queueForPlay(const std::shared_ptr<Sound> &sound,
394             int32_t soundID, float leftVolume, float rightVolume,
395             int32_t priority, int32_t loop, float rate)
396             NO_THREAD_SAFETY_ANALYSIS; // uses unique_lock
397 
398     ///////////////////////////////////////////////////////////////////////
399     // Called from soundpool::Stream
400 
getAttributes()401     const audio_attributes_t* getAttributes() const { return &mAttributes; }
402 
403     // Moves the stream to the restart queue (called upon BUFFER_END of the static track)
404     // this is locked internally.
405     // If activeStreamIDToMatch is nonzero, it will only move to the restart queue
406     // if the streamIDToMatch is found on the active queue.
407     void moveToRestartQueue(Stream* stream, int32_t activeStreamIDToMatch = 0);
408 
409 private:
410 
411     void run(int32_t id) NO_THREAD_SAFETY_ANALYSIS; // worker thread, takes unique_lock.
412     void dump() const;                           // no lock needed
413 
414     // returns true if more worker threads are needed.
needMoreThreads_l()415     bool needMoreThreads_l() REQUIRES(mStreamManagerLock) {
416         return mRestartStreams.size() > 0 &&
417                 (mThreadPool->getActiveThreadCount() == 0
418                 || std::distance(mRestartStreams.begin(),
419                         mRestartStreams.upper_bound(systemTime()))
420                         > (ptrdiff_t)mThreadPool->getActiveThreadCount());
421     }
422 
423     // returns true if the stream was added.
424     bool moveToRestartQueue_l(
425             Stream* stream, int32_t activeStreamIDToMatch = 0) REQUIRES(mStreamManagerLock);
426     // returns number of queues the stream was removed from (should be 0 or 1);
427     // a special code of -1 is returned if activeStreamIDToMatch is > 0 and
428     // the stream wasn't found on the active queue.
429     ssize_t removeFromQueues_l(
430             Stream* stream, int32_t activeStreamIDToMatch = 0) REQUIRES(mStreamManagerLock);
431     void addToRestartQueue_l(Stream *stream) REQUIRES(mStreamManagerLock);
432     void addToActiveQueue_l(Stream *stream) REQUIRES(mStreamManagerLock);
433     void sanityCheckQueue_l() const REQUIRES(mStreamManagerLock);
434 
435     const audio_attributes_t mAttributes;
436     std::unique_ptr<ThreadPool> mThreadPool;                  // locked internally
437 
438     // mStreamManagerLock is used to lock access for transitions between the
439     // 4 stream queues by the Manager Thread or by the user initiated play().
440     // A stream pair has exactly one stream on exactly one of the queues.
441     std::mutex                  mStreamManagerLock;
442     std::condition_variable     mStreamManagerCondition GUARDED_BY(mStreamManagerLock);
443 
444     bool                        mQuit GUARDED_BY(mStreamManagerLock) = false;
445 
446     // There are constructor arg "streams" pairs of streams, only one of each
447     // pair on the 4 stream queues below.  The other stream in the pair serves as
448     // placeholder to accumulate user changes, pending actual availability of the
449     // AudioTrack, as it may be in use, requiring stop-then-restart.
450     //
451     // The 4 queues are implemented in the appropriate STL container based on perceived
452     // optimality.
453 
454     // 1) mRestartStreams: Streams awaiting stop.
455     // The paired stream may be active (but with no AudioTrack), and will be restarted
456     // with an active AudioTrack when the current stream is stopped.
457     std::multimap<int64_t /* stopTimeNs */, Stream*>
458                                 mRestartStreams GUARDED_BY(mStreamManagerLock);
459 
460     // 2) mActiveStreams: Streams that are active.
461     // The paired stream will be inactive.
462     // This is in order of specified by kStealActiveStream_OldestFirst
463     std::list<Stream*>          mActiveStreams GUARDED_BY(mStreamManagerLock);
464 
465     // 3) mAvailableStreams: Streams that are inactive.
466     // The paired stream will also be inactive.
467     // No particular order.
468     std::unordered_set<Stream*> mAvailableStreams GUARDED_BY(mStreamManagerLock);
469 
470     // 4) mProcessingStreams: Streams that are being processed by the ManagerThreads
471     // When on this queue, the stream and its pair are not available for stealing.
472     // Each ManagerThread will have at most one stream on the mProcessingStreams queue.
473     // The paired stream may be active or restarting.
474     // No particular order.
475     std::unordered_set<Stream*> mProcessingStreams GUARDED_BY(mStreamManagerLock);
476 };
477 
478 } // namespace android::soundpool
479