1 #include "SkTaskGroup.h"
2 
3 #include "SkCondVar.h"
4 #include "SkRunnable.h"
5 #include "SkTDArray.h"
6 #include "SkThread.h"
7 #include "SkThreadUtils.h"
8 
9 #if defined(SK_BUILD_FOR_WIN32)
num_cores()10     static inline int num_cores() {
11         SYSTEM_INFO sysinfo;
12         GetSystemInfo(&sysinfo);
13         return sysinfo.dwNumberOfProcessors;
14     }
15 #else
16     #include <unistd.h>
num_cores()17     static inline int num_cores() {
18         return (int) sysconf(_SC_NPROCESSORS_ONLN);
19     }
20 #endif
21 
22 namespace {
23 
24 class ThreadPool : SkNoncopyable {
25 public:
Add(SkRunnable * task,int32_t * pending)26     static void Add(SkRunnable* task, int32_t* pending) {
27         if (!gGlobal) {  // If we have no threads, run synchronously.
28             return task->run();
29         }
30         gGlobal->add(&CallRunnable, task, pending);
31     }
32 
Add(void (* fn)(void *),void * arg,int32_t * pending)33     static void Add(void (*fn)(void*), void* arg, int32_t* pending) {
34         if (!gGlobal) {
35             return fn(arg);
36         }
37         gGlobal->add(fn, arg, pending);
38     }
39 
Batch(void (* fn)(void *),void * args,int N,size_t stride,int32_t * pending)40     static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32_t* pending) {
41         if (!gGlobal) {
42             for (int i = 0; i < N; i++) { fn((char*)args + i*stride); }
43             return;
44         }
45         gGlobal->batch(fn, args, N, stride, pending);
46     }
47 
Wait(int32_t * pending)48     static void Wait(int32_t* pending) {
49         if (!gGlobal) {  // If we have no threads, the work must already be done.
50             SkASSERT(*pending == 0);
51             return;
52         }
53         while (sk_acquire_load(pending) > 0) {  // Pairs with sk_atomic_dec here or in Loop.
54             // Lend a hand until our SkTaskGroup of interest is done.
55             Work work;
56             {
57                 AutoLock lock(&gGlobal->fReady);
58                 if (gGlobal->fWork.isEmpty()) {
59                     // Someone has picked up all the work (including ours).  How nice of them!
60                     // (They may still be working on it, so we can't assert *pending == 0 here.)
61                     continue;
62                 }
63                 gGlobal->fWork.pop(&work);
64             }
65             // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
66             // We threads gotta stick together.  We're always making forward progress.
67             work.fn(work.arg);
68             sk_atomic_dec(work.pending);  // Release pairs with the sk_acquire_load() just above.
69         }
70     }
71 
72 private:
73     struct AutoLock {
AutoLock__anon1aab42e60111::ThreadPool::AutoLock74         AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
~AutoLock__anon1aab42e60111::ThreadPool::AutoLock75         ~AutoLock() { fC->unlock(); }
76     private:
77         SkCondVar* fC;
78     };
79 
CallRunnable(void * arg)80     static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
81 
82     struct Work {
83         void (*fn)(void*);  // A function to call,
84         void* arg;          // its argument,
85         int32_t* pending;   // then sk_atomic_dec(pending) afterwards.
86     };
87 
ThreadPool(int threads)88     explicit ThreadPool(int threads) : fDraining(false) {
89         if (threads == -1) {
90             threads = num_cores();
91         }
92         for (int i = 0; i < threads; i++) {
93             fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));
94             fThreads.top()->start();
95         }
96     }
97 
~ThreadPool()98     ~ThreadPool() {
99         SkASSERT(fWork.isEmpty());  // All SkTaskGroups should be destroyed by now.
100         {
101             AutoLock lock(&fReady);
102             fDraining = true;
103             fReady.broadcast();
104         }
105         for (int i = 0; i < fThreads.count(); i++) {
106             fThreads[i]->join();
107         }
108         SkASSERT(fWork.isEmpty());  // Can't hurt to double check.
109         fThreads.deleteAll();
110     }
111 
add(void (* fn)(void *),void * arg,int32_t * pending)112     void add(void (*fn)(void*), void* arg, int32_t* pending) {
113         Work work = { fn, arg, pending };
114         sk_atomic_inc(pending);  // No barrier needed.
115         {
116             AutoLock lock(&fReady);
117             fWork.push(work);
118             fReady.signal();
119         }
120     }
121 
batch(void (* fn)(void *),void * arg,int N,size_t stride,int32_t * pending)122     void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pending) {
123         sk_atomic_add(pending, N);  // No barrier needed.
124         {
125             AutoLock lock(&fReady);
126             Work* batch = fWork.append(N);
127             for (int i = 0; i < N; i++) {
128                 Work work = { fn, (char*)arg + i*stride, pending };
129                 batch[i] = work;
130             }
131             fReady.broadcast();
132         }
133     }
134 
Loop(void * arg)135     static void Loop(void* arg) {
136         ThreadPool* pool = (ThreadPool*)arg;
137         Work work;
138         while (true) {
139             {
140                 AutoLock lock(&pool->fReady);
141                 while (pool->fWork.isEmpty()) {
142                     if (pool->fDraining) {
143                         return;
144                     }
145                     pool->fReady.wait();
146                 }
147                 pool->fWork.pop(&work);
148             }
149             work.fn(work.arg);
150             sk_atomic_dec(work.pending);  // Release pairs with sk_acquire_load() in Wait().
151         }
152     }
153 
154     SkTDArray<Work>      fWork;
155     SkTDArray<SkThread*> fThreads;
156     SkCondVar            fReady;
157     bool                 fDraining;
158 
159     static ThreadPool* gGlobal;
160     friend struct SkTaskGroup::Enabler;
161 };
162 ThreadPool* ThreadPool::gGlobal = NULL;
163 
164 }  // namespace
165 
Enabler(int threads)166 SkTaskGroup::Enabler::Enabler(int threads) {
167     SkASSERT(ThreadPool::gGlobal == NULL);
168     if (threads != 0 && SkCondVar::Supported()) {
169         ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
170     }
171 }
172 
~Enabler()173 SkTaskGroup::Enabler::~Enabler() {
174     SkDELETE(ThreadPool::gGlobal);
175 }
176 
SkTaskGroup()177 SkTaskGroup::SkTaskGroup() : fPending(0) {}
178 
wait()179 void SkTaskGroup::wait()                            { ThreadPool::Wait(&fPending); }
add(SkRunnable * task)180 void SkTaskGroup::add(SkRunnable* task)             { ThreadPool::Add(task, &fPending); }
add(void (* fn)(void *),void * arg)181 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); }
batch(void (* fn)(void *),void * args,int N,size_t stride)182 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) {
183     ThreadPool::Batch(fn, args, N, stride, &fPending);
184 }
185 
186