1 /*
2 * Copyright 2014 Google Inc.
3 *
4 * Use of this source code is governed by a BSD-style license that can be
5 * found in the LICENSE file.
6 */
7
8 #include "SkOnce.h"
9 #include "SkSemaphore.h"
10 #include "SkSpinlock.h"
11 #include "SkTArray.h"
12 #include "SkTDArray.h"
13 #include "SkTaskGroup.h"
14 #include "SkThreadUtils.h"
15
16 #if defined(SK_BUILD_FOR_WIN32)
query_num_cores(int * num_cores)17 static void query_num_cores(int* num_cores) {
18 SYSTEM_INFO sysinfo;
19 GetNativeSystemInfo(&sysinfo);
20 *num_cores = sysinfo.dwNumberOfProcessors;
21 }
22 #else
23 #include <unistd.h>
query_num_cores(int * num_cores)24 static void query_num_cores(int* num_cores) {
25 *num_cores = (int)sysconf(_SC_NPROCESSORS_ONLN);
26 }
27 #endif
28
29 // We cache sk_num_cores() so we only query the OS once.
30 SK_DECLARE_STATIC_ONCE(g_query_num_cores_once);
sk_num_cores()31 int sk_num_cores() {
32 static int num_cores = 0;
33 SkOnce(&g_query_num_cores_once, query_num_cores, &num_cores);
34 SkASSERT(num_cores > 0);
35 return num_cores;
36 }
37
38 namespace {
39
40 class ThreadPool : SkNoncopyable {
41 public:
Add(std::function<void (void)> fn,SkAtomic<int32_t> * pending)42 static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
43 if (!gGlobal) {
44 return fn();
45 }
46 gGlobal->add(fn, pending);
47 }
48
Batch(int N,std::function<void (int)> fn,SkAtomic<int32_t> * pending)49 static void Batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) {
50 if (!gGlobal) {
51 for (int i = 0; i < N; i++) { fn(i); }
52 return;
53 }
54 gGlobal->batch(N, fn, pending);
55 }
56
Wait(SkAtomic<int32_t> * pending)57 static void Wait(SkAtomic<int32_t>* pending) {
58 if (!gGlobal) { // If we have no threads, the work must already be done.
59 SkASSERT(pending->load(sk_memory_order_relaxed) == 0);
60 return;
61 }
62 // Acquire pairs with decrement release here or in Loop.
63 while (pending->load(sk_memory_order_acquire) > 0) {
64 // Lend a hand until our SkTaskGroup of interest is done.
65 Work work;
66 {
67 // We're stealing work opportunistically,
68 // so we never call fWorkAvailable.wait(), which could sleep us if there's no work.
69 // This means fWorkAvailable is only an upper bound on fWork.count().
70 AutoLock lock(&gGlobal->fWorkLock);
71 if (gGlobal->fWork.empty()) {
72 // Someone has picked up all the work (including ours). How nice of them!
73 // (They may still be working on it, so we can't assert *pending == 0 here.)
74 continue;
75 }
76 work = gGlobal->fWork.back();
77 gGlobal->fWork.pop_back();
78 }
79 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
80 // We threads gotta stick together. We're always making forward progress.
81 work.fn();
82 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above.
83 }
84 }
85
86 private:
87 struct AutoLock {
AutoLock__anon1aab42e60111::ThreadPool::AutoLock88 AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); }
~AutoLock__anon1aab42e60111::ThreadPool::AutoLock89 ~AutoLock() { fLock->release(); }
90 private:
91 SkSpinlock* fLock;
92 };
93
94 struct Work {
95 std::function<void(void)> fn; // A function to call
96 SkAtomic<int32_t>* pending; // then decrement pending afterwards.
97 };
98
ThreadPool(int threads)99 explicit ThreadPool(int threads) {
100 if (threads == -1) {
101 threads = sk_num_cores();
102 }
103 for (int i = 0; i < threads; i++) {
104 fThreads.push(new SkThread(&ThreadPool::Loop, this));
105 fThreads.top()->start();
106 }
107 }
108
~ThreadPool()109 ~ThreadPool() {
110 SkASSERT(fWork.empty()); // All SkTaskGroups should be destroyed by now.
111
112 // Send a poison pill to each thread.
113 SkAtomic<int> dummy(0);
114 for (int i = 0; i < fThreads.count(); i++) {
115 this->add(nullptr, &dummy);
116 }
117 // Wait for them all to swallow the pill and die.
118 for (int i = 0; i < fThreads.count(); i++) {
119 fThreads[i]->join();
120 }
121 SkASSERT(fWork.empty()); // Can't hurt to double check.
122 fThreads.deleteAll();
123 }
124
add(std::function<void (void)> fn,SkAtomic<int32_t> * pending)125 void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
126 Work work = { fn, pending };
127 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed.
128 {
129 AutoLock lock(&fWorkLock);
130 fWork.push_back(work);
131 }
132 fWorkAvailable.signal(1);
133 }
134
batch(int N,std::function<void (int)> fn,SkAtomic<int32_t> * pending)135 void batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) {
136 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed.
137 {
138 AutoLock lock(&fWorkLock);
139 for (int i = 0; i < N; i++) {
140 Work work = { [i, fn]() { fn(i); }, pending };
141 fWork.push_back(work);
142 }
143 }
144 fWorkAvailable.signal(N);
145 }
146
Loop(void * arg)147 static void Loop(void* arg) {
148 ThreadPool* pool = (ThreadPool*)arg;
149 Work work;
150 while (true) {
151 // Sleep until there's work available, and claim one unit of Work as we wake.
152 pool->fWorkAvailable.wait();
153 {
154 AutoLock lock(&pool->fWorkLock);
155 if (pool->fWork.empty()) {
156 // Someone in Wait() stole our work (fWorkAvailable is an upper bound).
157 // Well, that's fine, back to sleep for us.
158 continue;
159 }
160 work = pool->fWork.back();
161 pool->fWork.pop_back();
162 }
163 if (!work.fn) {
164 return; // Poison pill. Time... to die.
165 }
166 work.fn();
167 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait().
168 }
169 }
170
171 // fWorkLock must be held when reading or modifying fWork.
172 SkSpinlock fWorkLock;
173 SkTArray<Work> fWork;
174
175 // A thread-safe upper bound for fWork.count().
176 //
177 // We'd have it be an exact count but for the loop in Wait():
178 // we never want that to block, so it can't call fWorkAvailable.wait(),
179 // and that's the only way to decrement fWorkAvailable.
180 // So fWorkAvailable may overcount actual the work available.
181 // We make do, but this means some worker threads may wake spuriously.
182 SkSemaphore fWorkAvailable;
183
184 // These are only changed in a single-threaded context.
185 SkTDArray<SkThread*> fThreads;
186 static ThreadPool* gGlobal;
187
188 friend struct SkTaskGroup::Enabler;
189 };
190 ThreadPool* ThreadPool::gGlobal = nullptr;
191
192 } // namespace
193
Enabler(int threads)194 SkTaskGroup::Enabler::Enabler(int threads) {
195 SkASSERT(ThreadPool::gGlobal == nullptr);
196 if (threads != 0) {
197 ThreadPool::gGlobal = new ThreadPool(threads);
198 }
199 }
200
~Enabler()201 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; }
202
SkTaskGroup()203 SkTaskGroup::SkTaskGroup() : fPending(0) {}
204
wait()205 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); }
add(std::function<void (void)> fn)206 void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPending); }
batch(int N,std::function<void (int)> fn)207 void SkTaskGroup::batch(int N, std::function<void(int)> fn) {
208 ThreadPool::Batch(N, fn, &fPending);
209 }
210
211