1 ///////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2005, Industrial Light & Magic, a division of Lucas
4 // Digital Ltd. LLC
5 //
6 // All rights reserved.
7 //
8 // Redistribution and use in source and binary forms, with or without
9 // modification, are permitted provided that the following conditions are
10 // met:
11 // * Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // * Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 // * Neither the name of Industrial Light & Magic nor the names of
18 // its contributors may be used to endorse or promote products derived
19 // from this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 ///////////////////////////////////////////////////////////////////////////
34
35 //-----------------------------------------------------------------------------
36 //
37 // class Task, class ThreadPool, class TaskGroup
38 //
39 //-----------------------------------------------------------------------------
40
41 #include "IlmThread.h"
42 #include "IlmThreadMutex.h"
43 #include "IlmThreadSemaphore.h"
44 #include "IlmThreadPool.h"
45 #include "Iex.h"
46 #include <list>
47
48 using namespace std;
49
50 namespace IlmThread {
51 namespace {
52
53 class WorkerThread: public Thread
54 {
55 public:
56
57 WorkerThread (ThreadPool::Data* data);
58
59 virtual void run ();
60
61 private:
62
63 ThreadPool::Data * _data;
64 };
65
66 } //namespace
67
68
69 struct TaskGroup::Data
70 {
71 Data ();
72 ~Data ();
73
74 void addTask () ;
75 void removeTask ();
76
77 Semaphore isEmpty; // used to signal that the taskgroup is empty
78 int numPending; // number of pending tasks to still execute
79 };
80
81
82 struct ThreadPool::Data
83 {
84 Data ();
85 ~Data();
86
87 void finish ();
88 bool stopped () const;
89 void stop ();
90
91 Semaphore taskSemaphore; // threads wait on this for ready tasks
92 Mutex taskMutex; // mutual exclusion for the tasks list
93 list<Task*> tasks; // the list of tasks to execute
94 size_t numTasks; // fast access to list size
95 // (list::size() can be O(n))
96
97 Semaphore threadSemaphore; // signaled when a thread starts executing
98 Mutex threadMutex; // mutual exclusion for threads list
99 list<WorkerThread*> threads; // the list of all threads
100 size_t numThreads; // fast access to list size
101
102 bool stopping; // flag indicating whether to stop threads
103 Mutex stopMutex; // mutual exclusion for stopping flag
104 };
105
106
107
108 //
109 // class WorkerThread
110 //
111
WorkerThread(ThreadPool::Data * data)112 WorkerThread::WorkerThread (ThreadPool::Data* data):
113 _data (data)
114 {
115 start();
116 }
117
118
119 void
run()120 WorkerThread::run ()
121 {
122 //
123 // Signal that the thread has started executing
124 //
125
126 _data->threadSemaphore.post();
127
128 while (true)
129 {
130 //
131 // Wait for a task to become available
132 //
133
134 _data->taskSemaphore.wait();
135
136 {
137 Lock taskLock (_data->taskMutex);
138
139 //
140 // If there is a task pending, pop off the next task in the FIFO
141 //
142
143 if (_data->numTasks > 0)
144 {
145 Task* task = _data->tasks.front();
146 TaskGroup* taskGroup = task->group();
147 _data->tasks.pop_front();
148 _data->numTasks--;
149
150 taskLock.release();
151 task->execute();
152 taskLock.acquire();
153
154 delete task;
155 taskGroup->_data->removeTask();
156 }
157 else if (_data->stopped())
158 {
159 break;
160 }
161 }
162 }
163 }
164
165
166 //
167 // struct TaskGroup::Data
168 //
169
Data()170 TaskGroup::Data::Data (): isEmpty (1), numPending (0)
171 {
172 // empty
173 }
174
175
~Data()176 TaskGroup::Data::~Data ()
177 {
178 //
179 // A TaskGroup acts like an "inverted" semaphore: if the count
180 // is above 0 then waiting on the taskgroup will block. This
181 // destructor waits until the taskgroup is empty before returning.
182 //
183
184 isEmpty.wait ();
185 }
186
187
188 void
addTask()189 TaskGroup::Data::addTask ()
190 {
191 //
192 // Any access to the taskgroup is protected by a mutex that is
193 // held by the threadpool. Therefore it is safe to access
194 // numPending before we wait on the semaphore.
195 //
196
197 if (numPending++ == 0)
198 isEmpty.wait ();
199 }
200
201
202 void
removeTask()203 TaskGroup::Data::removeTask ()
204 {
205 if (--numPending == 0)
206 isEmpty.post ();
207 }
208
209
210 //
211 // struct ThreadPool::Data
212 //
213
Data()214 ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false)
215 {
216 // empty
217 }
218
219
~Data()220 ThreadPool::Data::~Data()
221 {
222 Lock lock (threadMutex);
223 finish ();
224 }
225
226
227 void
finish()228 ThreadPool::Data::finish ()
229 {
230 stop();
231
232 //
233 // Signal enough times to allow all threads to stop.
234 //
235 // Wait until all threads have started their run functions.
236 // If we do not wait before we destroy the threads then it's
237 // possible that the threads have not yet called their run
238 // functions.
239 // If this happens then the run function will be called off
240 // of an invalid object and we will crash, most likely with
241 // an error like: "pure virtual method called"
242 //
243
244 for (size_t i = 0; i < numThreads; i++)
245 {
246 taskSemaphore.post();
247 threadSemaphore.wait();
248 }
249
250 //
251 // Join all the threads
252 //
253
254 for (list<WorkerThread*>::iterator i = threads.begin();
255 i != threads.end();
256 ++i)
257 {
258 delete (*i);
259 }
260
261 Lock lock1 (taskMutex);
262 Lock lock2 (stopMutex);
263 threads.clear();
264 tasks.clear();
265 numThreads = 0;
266 numTasks = 0;
267 stopping = false;
268 }
269
270
271 bool
stopped() const272 ThreadPool::Data::stopped () const
273 {
274 Lock lock (stopMutex);
275 return stopping;
276 }
277
278
279 void
stop()280 ThreadPool::Data::stop ()
281 {
282 Lock lock (stopMutex);
283 stopping = true;
284 }
285
286
287 //
288 // class Task
289 //
290
Task(TaskGroup * g)291 Task::Task (TaskGroup* g): _group(g)
292 {
293 // empty
294 }
295
296
~Task()297 Task::~Task()
298 {
299 // empty
300 }
301
302
303 TaskGroup*
group()304 Task::group ()
305 {
306 return _group;
307 }
308
309
TaskGroup()310 TaskGroup::TaskGroup ():
311 _data (new Data())
312 {
313 // empty
314 }
315
316
~TaskGroup()317 TaskGroup::~TaskGroup ()
318 {
319 delete _data;
320 }
321
322
323 //
324 // class ThreadPool
325 //
326
ThreadPool(unsigned nthreads)327 ThreadPool::ThreadPool (unsigned nthreads):
328 _data (new Data())
329 {
330 setNumThreads (nthreads);
331 }
332
333
~ThreadPool()334 ThreadPool::~ThreadPool ()
335 {
336 delete _data;
337 }
338
339
340 int
numThreads() const341 ThreadPool::numThreads () const
342 {
343 Lock lock (_data->threadMutex);
344 return _data->numThreads;
345 }
346
347
348 void
setNumThreads(int count)349 ThreadPool::setNumThreads (int count)
350 {
351 if (count < 0)
352 throw Iex::ArgExc ("Attempt to set the number of threads "
353 "in a thread pool to a negative value.");
354
355 //
356 // Lock access to thread list and size
357 //
358
359 Lock lock (_data->threadMutex);
360
361 if ((size_t)count > _data->numThreads)
362 {
363 //
364 // Add more threads
365 //
366
367 while (_data->numThreads < (size_t)count)
368 {
369 _data->threads.push_back (new WorkerThread (_data));
370 _data->numThreads++;
371 }
372 }
373 else if ((size_t)count < _data->numThreads)
374 {
375 //
376 // Wait until all existing threads are finished processing,
377 // then delete all threads.
378 //
379
380 _data->finish ();
381
382 //
383 // Add in new threads
384 //
385
386 while (_data->numThreads < (size_t)count)
387 {
388 _data->threads.push_back (new WorkerThread (_data));
389 _data->numThreads++;
390 }
391 }
392 }
393
394
395 void
addTask(Task * task)396 ThreadPool::addTask (Task* task)
397 {
398 //
399 // Lock the threads, needed to access numThreads
400 //
401
402 Lock lock (_data->threadMutex);
403
404 if (_data->numThreads == 0)
405 {
406 task->execute ();
407 delete task;
408 }
409 else
410 {
411 //
412 // Get exclusive access to the tasks queue
413 //
414
415 {
416 Lock taskLock (_data->taskMutex);
417
418 //
419 // Push the new task into the FIFO
420 //
421
422 _data->tasks.push_back (task);
423 _data->numTasks++;
424 task->group()->_data->addTask();
425 }
426
427 //
428 // Signal that we have a new task to process
429 //
430
431 _data->taskSemaphore.post ();
432 }
433 }
434
435
436 ThreadPool&
globalThreadPool()437 ThreadPool::globalThreadPool ()
438 {
439 //
440 // The global thread pool
441 //
442
443 static ThreadPool gThreadPool (0);
444
445 return gThreadPool;
446 }
447
448
449 void
addGlobalTask(Task * task)450 ThreadPool::addGlobalTask (Task* task)
451 {
452 globalThreadPool().addTask (task);
453 }
454
455
456 } // namespace IlmThread
457