1 #include "SkTaskGroup.h"
2
3 #include "SkCondVar.h"
4 #include "SkRunnable.h"
5 #include "SkTDArray.h"
6 #include "SkThread.h"
7 #include "SkThreadUtils.h"
8
9 #if defined(SK_BUILD_FOR_WIN32)
num_cores()10 static inline int num_cores() {
11 SYSTEM_INFO sysinfo;
12 GetSystemInfo(&sysinfo);
13 return sysinfo.dwNumberOfProcessors;
14 }
15 #else
16 #include <unistd.h>
num_cores()17 static inline int num_cores() {
18 return (int) sysconf(_SC_NPROCESSORS_ONLN);
19 }
20 #endif
21
22 namespace {
23
24 class ThreadPool : SkNoncopyable {
25 public:
Add(SkRunnable * task,int32_t * pending)26 static void Add(SkRunnable* task, int32_t* pending) {
27 if (!gGlobal) { // If we have no threads, run synchronously.
28 return task->run();
29 }
30 gGlobal->add(&CallRunnable, task, pending);
31 }
32
Add(void (* fn)(void *),void * arg,int32_t * pending)33 static void Add(void (*fn)(void*), void* arg, int32_t* pending) {
34 if (!gGlobal) {
35 return fn(arg);
36 }
37 gGlobal->add(fn, arg, pending);
38 }
39
Batch(void (* fn)(void *),void * args,int N,size_t stride,int32_t * pending)40 static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32_t* pending) {
41 if (!gGlobal) {
42 for (int i = 0; i < N; i++) { fn((char*)args + i*stride); }
43 return;
44 }
45 gGlobal->batch(fn, args, N, stride, pending);
46 }
47
Wait(int32_t * pending)48 static void Wait(int32_t* pending) {
49 if (!gGlobal) { // If we have no threads, the work must already be done.
50 SkASSERT(*pending == 0);
51 return;
52 }
53 while (sk_acquire_load(pending) > 0) { // Pairs with sk_atomic_dec here or in Loop.
54 // Lend a hand until our SkTaskGroup of interest is done.
55 Work work;
56 {
57 AutoLock lock(&gGlobal->fReady);
58 if (gGlobal->fWork.isEmpty()) {
59 // Someone has picked up all the work (including ours). How nice of them!
60 // (They may still be working on it, so we can't assert *pending == 0 here.)
61 continue;
62 }
63 gGlobal->fWork.pop(&work);
64 }
65 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
66 // We threads gotta stick together. We're always making forward progress.
67 work.fn(work.arg);
68 sk_atomic_dec(work.pending); // Release pairs with the sk_acquire_load() just above.
69 }
70 }
71
72 private:
73 struct AutoLock {
AutoLock__anon1aab42e60111::ThreadPool::AutoLock74 AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
~AutoLock__anon1aab42e60111::ThreadPool::AutoLock75 ~AutoLock() { fC->unlock(); }
76 private:
77 SkCondVar* fC;
78 };
79
CallRunnable(void * arg)80 static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
81
82 struct Work {
83 void (*fn)(void*); // A function to call,
84 void* arg; // its argument,
85 int32_t* pending; // then sk_atomic_dec(pending) afterwards.
86 };
87
ThreadPool(int threads)88 explicit ThreadPool(int threads) : fDraining(false) {
89 if (threads == -1) {
90 threads = num_cores();
91 }
92 for (int i = 0; i < threads; i++) {
93 fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));
94 fThreads.top()->start();
95 }
96 }
97
~ThreadPool()98 ~ThreadPool() {
99 SkASSERT(fWork.isEmpty()); // All SkTaskGroups should be destroyed by now.
100 {
101 AutoLock lock(&fReady);
102 fDraining = true;
103 fReady.broadcast();
104 }
105 for (int i = 0; i < fThreads.count(); i++) {
106 fThreads[i]->join();
107 }
108 SkASSERT(fWork.isEmpty()); // Can't hurt to double check.
109 fThreads.deleteAll();
110 }
111
add(void (* fn)(void *),void * arg,int32_t * pending)112 void add(void (*fn)(void*), void* arg, int32_t* pending) {
113 Work work = { fn, arg, pending };
114 sk_atomic_inc(pending); // No barrier needed.
115 {
116 AutoLock lock(&fReady);
117 fWork.push(work);
118 fReady.signal();
119 }
120 }
121
batch(void (* fn)(void *),void * arg,int N,size_t stride,int32_t * pending)122 void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pending) {
123 sk_atomic_add(pending, N); // No barrier needed.
124 {
125 AutoLock lock(&fReady);
126 Work* batch = fWork.append(N);
127 for (int i = 0; i < N; i++) {
128 Work work = { fn, (char*)arg + i*stride, pending };
129 batch[i] = work;
130 }
131 fReady.broadcast();
132 }
133 }
134
Loop(void * arg)135 static void Loop(void* arg) {
136 ThreadPool* pool = (ThreadPool*)arg;
137 Work work;
138 while (true) {
139 {
140 AutoLock lock(&pool->fReady);
141 while (pool->fWork.isEmpty()) {
142 if (pool->fDraining) {
143 return;
144 }
145 pool->fReady.wait();
146 }
147 pool->fWork.pop(&work);
148 }
149 work.fn(work.arg);
150 sk_atomic_dec(work.pending); // Release pairs with sk_acquire_load() in Wait().
151 }
152 }
153
154 SkTDArray<Work> fWork;
155 SkTDArray<SkThread*> fThreads;
156 SkCondVar fReady;
157 bool fDraining;
158
159 static ThreadPool* gGlobal;
160 friend struct SkTaskGroup::Enabler;
161 };
162 ThreadPool* ThreadPool::gGlobal = NULL;
163
164 } // namespace
165
Enabler(int threads)166 SkTaskGroup::Enabler::Enabler(int threads) {
167 SkASSERT(ThreadPool::gGlobal == NULL);
168 if (threads != 0 && SkCondVar::Supported()) {
169 ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
170 }
171 }
172
~Enabler()173 SkTaskGroup::Enabler::~Enabler() {
174 SkDELETE(ThreadPool::gGlobal);
175 }
176
SkTaskGroup()177 SkTaskGroup::SkTaskGroup() : fPending(0) {}
178
wait()179 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); }
add(SkRunnable * task)180 void SkTaskGroup::add(SkRunnable* task) { ThreadPool::Add(task, &fPending); }
add(void (* fn)(void *),void * arg)181 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); }
batch(void (* fn)(void *),void * args,int N,size_t stride)182 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) {
183 ThreadPool::Batch(fn, args, N, stride, &fPending);
184 }
185
186