1 /*
2  * Copyright 2014 Google Inc.
3  *
4  * Use of this source code is governed by a BSD-style license that can be
5  * found in the LICENSE file.
6  */
7 
8 #include "SkOnce.h"
9 #include "SkSemaphore.h"
10 #include "SkSpinlock.h"
11 #include "SkTArray.h"
12 #include "SkTDArray.h"
13 #include "SkTaskGroup.h"
14 #include "SkThreadUtils.h"
15 
16 #if defined(SK_BUILD_FOR_WIN32)
query_num_cores(int * num_cores)17     static void query_num_cores(int* num_cores) {
18         SYSTEM_INFO sysinfo;
19         GetNativeSystemInfo(&sysinfo);
20         *num_cores = sysinfo.dwNumberOfProcessors;
21     }
22 #else
23     #include <unistd.h>
query_num_cores(int * num_cores)24     static void query_num_cores(int* num_cores) {
25         *num_cores = (int)sysconf(_SC_NPROCESSORS_ONLN);
26     }
27 #endif
28 
29 // We cache sk_num_cores() so we only query the OS once.
30 SK_DECLARE_STATIC_ONCE(g_query_num_cores_once);
sk_num_cores()31 int sk_num_cores() {
32     static int num_cores = 0;
33     SkOnce(&g_query_num_cores_once, query_num_cores, &num_cores);
34     SkASSERT(num_cores > 0);
35     return num_cores;
36 }
37 
38 namespace {
39 
40 class ThreadPool : SkNoncopyable {
41 public:
Add(std::function<void (void)> fn,SkAtomic<int32_t> * pending)42     static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
43         if (!gGlobal) {
44             return fn();
45         }
46         gGlobal->add(fn, pending);
47     }
48 
Batch(int N,std::function<void (int)> fn,SkAtomic<int32_t> * pending)49     static void Batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) {
50         if (!gGlobal) {
51             for (int i = 0; i < N; i++) { fn(i); }
52             return;
53         }
54         gGlobal->batch(N, fn, pending);
55     }
56 
Wait(SkAtomic<int32_t> * pending)57     static void Wait(SkAtomic<int32_t>* pending) {
58         if (!gGlobal) {  // If we have no threads, the work must already be done.
59             SkASSERT(pending->load(sk_memory_order_relaxed) == 0);
60             return;
61         }
62         // Acquire pairs with decrement release here or in Loop.
63         while (pending->load(sk_memory_order_acquire) > 0) {
64             // Lend a hand until our SkTaskGroup of interest is done.
65             Work work;
66             {
67                 // We're stealing work opportunistically,
68                 // so we never call fWorkAvailable.wait(), which could sleep us if there's no work.
69                 // This means fWorkAvailable is only an upper bound on fWork.count().
70                 AutoLock lock(&gGlobal->fWorkLock);
71                 if (gGlobal->fWork.empty()) {
72                     // Someone has picked up all the work (including ours).  How nice of them!
73                     // (They may still be working on it, so we can't assert *pending == 0 here.)
74                     continue;
75                 }
76                 work = gGlobal->fWork.back();
77                 gGlobal->fWork.pop_back();
78             }
79             // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
80             // We threads gotta stick together.  We're always making forward progress.
81             work.fn();
82             work.pending->fetch_add(-1, sk_memory_order_release);  // Pairs with load above.
83         }
84     }
85 
86 private:
87     struct AutoLock {
AutoLock__anon1aab42e60111::ThreadPool::AutoLock88         AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); }
~AutoLock__anon1aab42e60111::ThreadPool::AutoLock89         ~AutoLock() { fLock->release(); }
90     private:
91         SkSpinlock* fLock;
92     };
93 
94     struct Work {
95         std::function<void(void)> fn; // A function to call
96         SkAtomic<int32_t>* pending;   // then decrement pending afterwards.
97     };
98 
ThreadPool(int threads)99     explicit ThreadPool(int threads) {
100         if (threads == -1) {
101             threads = sk_num_cores();
102         }
103         for (int i = 0; i < threads; i++) {
104             fThreads.push(new SkThread(&ThreadPool::Loop, this));
105             fThreads.top()->start();
106         }
107     }
108 
~ThreadPool()109     ~ThreadPool() {
110         SkASSERT(fWork.empty());  // All SkTaskGroups should be destroyed by now.
111 
112         // Send a poison pill to each thread.
113         SkAtomic<int> dummy(0);
114         for (int i = 0; i < fThreads.count(); i++) {
115             this->add(nullptr, &dummy);
116         }
117         // Wait for them all to swallow the pill and die.
118         for (int i = 0; i < fThreads.count(); i++) {
119             fThreads[i]->join();
120         }
121         SkASSERT(fWork.empty());  // Can't hurt to double check.
122         fThreads.deleteAll();
123     }
124 
add(std::function<void (void)> fn,SkAtomic<int32_t> * pending)125     void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
126         Work work = { fn, pending };
127         pending->fetch_add(+1, sk_memory_order_relaxed);  // No barrier needed.
128         {
129             AutoLock lock(&fWorkLock);
130             fWork.push_back(work);
131         }
132         fWorkAvailable.signal(1);
133     }
134 
batch(int N,std::function<void (int)> fn,SkAtomic<int32_t> * pending)135     void batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) {
136         pending->fetch_add(+N, sk_memory_order_relaxed);  // No barrier needed.
137         {
138             AutoLock lock(&fWorkLock);
139             for (int i = 0; i < N; i++) {
140                 Work work = { [i, fn]() { fn(i); }, pending };
141                 fWork.push_back(work);
142             }
143         }
144         fWorkAvailable.signal(N);
145     }
146 
Loop(void * arg)147     static void Loop(void* arg) {
148         ThreadPool* pool = (ThreadPool*)arg;
149         Work work;
150         while (true) {
151             // Sleep until there's work available, and claim one unit of Work as we wake.
152             pool->fWorkAvailable.wait();
153             {
154                 AutoLock lock(&pool->fWorkLock);
155                 if (pool->fWork.empty()) {
156                     // Someone in Wait() stole our work (fWorkAvailable is an upper bound).
157                     // Well, that's fine, back to sleep for us.
158                     continue;
159                 }
160                 work = pool->fWork.back();
161                 pool->fWork.pop_back();
162             }
163             if (!work.fn) {
164                 return;  // Poison pill.  Time... to die.
165             }
166             work.fn();
167             work.pending->fetch_add(-1, sk_memory_order_release);  // Pairs with load in Wait().
168         }
169     }
170 
171     // fWorkLock must be held when reading or modifying fWork.
172     SkSpinlock      fWorkLock;
173     SkTArray<Work>  fWork;
174 
175     // A thread-safe upper bound for fWork.count().
176     //
177     // We'd have it be an exact count but for the loop in Wait():
178     // we never want that to block, so it can't call fWorkAvailable.wait(),
179     // and that's the only way to decrement fWorkAvailable.
180     // So fWorkAvailable may overcount actual the work available.
181     // We make do, but this means some worker threads may wake spuriously.
182     SkSemaphore fWorkAvailable;
183 
184     // These are only changed in a single-threaded context.
185     SkTDArray<SkThread*> fThreads;
186     static ThreadPool* gGlobal;
187 
188     friend struct SkTaskGroup::Enabler;
189 };
190 ThreadPool* ThreadPool::gGlobal = nullptr;
191 
192 }  // namespace
193 
Enabler(int threads)194 SkTaskGroup::Enabler::Enabler(int threads) {
195     SkASSERT(ThreadPool::gGlobal == nullptr);
196     if (threads != 0) {
197         ThreadPool::gGlobal = new ThreadPool(threads);
198     }
199 }
200 
~Enabler()201 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; }
202 
SkTaskGroup()203 SkTaskGroup::SkTaskGroup() : fPending(0) {}
204 
wait()205 void SkTaskGroup::wait()                            { ThreadPool::Wait(&fPending); }
add(std::function<void (void)> fn)206 void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPending); }
batch(int N,std::function<void (int)> fn)207 void SkTaskGroup::batch(int N, std::function<void(int)> fn) {
208     ThreadPool::Batch(N, fn, &fPending);
209 }
210 
211