1 ///////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2005, Industrial Light & Magic, a division of Lucas
4 // Digital Ltd. LLC
5 //
6 // All rights reserved.
7 //
8 // Redistribution and use in source and binary forms, with or without
9 // modification, are permitted provided that the following conditions are
10 // met:
11 // *       Redistributions of source code must retain the above copyright
12 // notice, this list of conditions and the following disclaimer.
13 // *       Redistributions in binary form must reproduce the above
14 // copyright notice, this list of conditions and the following disclaimer
15 // in the documentation and/or other materials provided with the
16 // distribution.
17 // *       Neither the name of Industrial Light & Magic nor the names of
18 // its contributors may be used to endorse or promote products derived
19 // from this software without specific prior written permission.
20 //
21 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 //
33 ///////////////////////////////////////////////////////////////////////////
34 
35 //-----------------------------------------------------------------------------
36 //
37 //	class Task, class ThreadPool, class TaskGroup
38 //
39 //-----------------------------------------------------------------------------
40 
41 #include "IlmThread.h"
42 #include "IlmThreadMutex.h"
43 #include "IlmThreadSemaphore.h"
44 #include "IlmThreadPool.h"
45 #include "Iex.h"
46 #include <list>
47 
48 using namespace std;
49 
50 namespace IlmThread {
51 namespace {
52 
53 class WorkerThread: public Thread
54 {
55   public:
56 
57     WorkerThread (ThreadPool::Data* data);
58 
59     virtual void	run ();
60 
61   private:
62 
63     ThreadPool::Data *	_data;
64 };
65 
66 } //namespace
67 
68 
69 struct TaskGroup::Data
70 {
71      Data ();
72     ~Data ();
73 
74     void	addTask () ;
75     void	removeTask ();
76 
77     Semaphore	isEmpty;	// used to signal that the taskgroup is empty
78     int		numPending;	// number of pending tasks to still execute
79 };
80 
81 
82 struct ThreadPool::Data
83 {
84      Data ();
85     ~Data();
86 
87     void	finish ();
88     bool	stopped () const;
89     void	stop ();
90 
91     Semaphore taskSemaphore;        // threads wait on this for ready tasks
92     Mutex taskMutex;                // mutual exclusion for the tasks list
93     list<Task*> tasks;              // the list of tasks to execute
94     size_t numTasks;                // fast access to list size
95                                     //   (list::size() can be O(n))
96 
97     Semaphore threadSemaphore;      // signaled when a thread starts executing
98     Mutex threadMutex;              // mutual exclusion for threads list
99     list<WorkerThread*> threads;    // the list of all threads
100     size_t numThreads;              // fast access to list size
101 
102     bool stopping;                  // flag indicating whether to stop threads
103     Mutex stopMutex;                // mutual exclusion for stopping flag
104 };
105 
106 
107 
108 //
109 // class WorkerThread
110 //
111 
WorkerThread(ThreadPool::Data * data)112 WorkerThread::WorkerThread (ThreadPool::Data* data):
113     _data (data)
114 {
115     start();
116 }
117 
118 
119 void
run()120 WorkerThread::run ()
121 {
122     //
123     // Signal that the thread has started executing
124     //
125 
126     _data->threadSemaphore.post();
127 
128     while (true)
129     {
130     //
131         // Wait for a task to become available
132     //
133 
134         _data->taskSemaphore.wait();
135 
136         {
137             Lock taskLock (_data->taskMutex);
138 
139         //
140             // If there is a task pending, pop off the next task in the FIFO
141         //
142 
143             if (_data->numTasks > 0)
144             {
145                 Task* task = _data->tasks.front();
146         TaskGroup* taskGroup = task->group();
147                 _data->tasks.pop_front();
148                 _data->numTasks--;
149 
150                 taskLock.release();
151                 task->execute();
152                 taskLock.acquire();
153 
154                 delete task;
155                 taskGroup->_data->removeTask();
156             }
157             else if (_data->stopped())
158         {
159                 break;
160         }
161         }
162     }
163 }
164 
165 
166 //
167 // struct TaskGroup::Data
168 //
169 
Data()170 TaskGroup::Data::Data (): isEmpty (1), numPending (0)
171 {
172     // empty
173 }
174 
175 
~Data()176 TaskGroup::Data::~Data ()
177 {
178     //
179     // A TaskGroup acts like an "inverted" semaphore: if the count
180     // is above 0 then waiting on the taskgroup will block.  This
181     // destructor waits until the taskgroup is empty before returning.
182     //
183 
184     isEmpty.wait ();
185 }
186 
187 
188 void
addTask()189 TaskGroup::Data::addTask ()
190 {
191     //
192     // Any access to the taskgroup is protected by a mutex that is
193     // held by the threadpool.  Therefore it is safe to access
194     // numPending before we wait on the semaphore.
195     //
196 
197     if (numPending++ == 0)
198     isEmpty.wait ();
199 }
200 
201 
202 void
removeTask()203 TaskGroup::Data::removeTask ()
204 {
205     if (--numPending == 0)
206     isEmpty.post ();
207 }
208 
209 
210 //
211 // struct ThreadPool::Data
212 //
213 
Data()214 ThreadPool::Data::Data (): numTasks (0), numThreads (0), stopping (false)
215 {
216     // empty
217 }
218 
219 
~Data()220 ThreadPool::Data::~Data()
221 {
222     Lock lock (threadMutex);
223     finish ();
224 }
225 
226 
227 void
finish()228 ThreadPool::Data::finish ()
229 {
230     stop();
231 
232     //
233     // Signal enough times to allow all threads to stop.
234     //
235     // Wait until all threads have started their run functions.
236     // If we do not wait before we destroy the threads then it's
237     // possible that the threads have not yet called their run
238     // functions.
239     // If this happens then the run function will be called off
240     // of an invalid object and we will crash, most likely with
241     // an error like: "pure virtual method called"
242     //
243 
244     for (size_t i = 0; i < numThreads; i++)
245     {
246     taskSemaphore.post();
247     threadSemaphore.wait();
248     }
249 
250     //
251     // Join all the threads
252     //
253 
254     for (list<WorkerThread*>::iterator i = threads.begin();
255      i != threads.end();
256      ++i)
257     {
258     delete (*i);
259     }
260 
261     Lock lock1 (taskMutex);
262     Lock lock2 (stopMutex);
263     threads.clear();
264     tasks.clear();
265     numThreads = 0;
266     numTasks = 0;
267     stopping = false;
268 }
269 
270 
271 bool
stopped() const272 ThreadPool::Data::stopped () const
273 {
274     Lock lock (stopMutex);
275     return stopping;
276 }
277 
278 
279 void
stop()280 ThreadPool::Data::stop ()
281 {
282     Lock lock (stopMutex);
283     stopping = true;
284 }
285 
286 
287 //
288 // class Task
289 //
290 
Task(TaskGroup * g)291 Task::Task (TaskGroup* g): _group(g)
292 {
293     // empty
294 }
295 
296 
~Task()297 Task::~Task()
298 {
299     // empty
300 }
301 
302 
303 TaskGroup*
group()304 Task::group ()
305 {
306     return _group;
307 }
308 
309 
TaskGroup()310 TaskGroup::TaskGroup ():
311     _data (new Data())
312 {
313     // empty
314 }
315 
316 
~TaskGroup()317 TaskGroup::~TaskGroup ()
318 {
319     delete _data;
320 }
321 
322 
323 //
324 // class ThreadPool
325 //
326 
ThreadPool(unsigned nthreads)327 ThreadPool::ThreadPool (unsigned nthreads):
328     _data (new Data())
329 {
330     setNumThreads (nthreads);
331 }
332 
333 
~ThreadPool()334 ThreadPool::~ThreadPool ()
335 {
336     delete _data;
337 }
338 
339 
340 int
numThreads() const341 ThreadPool::numThreads () const
342 {
343     Lock lock (_data->threadMutex);
344     return _data->numThreads;
345 }
346 
347 
348 void
setNumThreads(int count)349 ThreadPool::setNumThreads (int count)
350 {
351     if (count < 0)
352         throw Iex::ArgExc ("Attempt to set the number of threads "
353                "in a thread pool to a negative value.");
354 
355     //
356     // Lock access to thread list and size
357     //
358 
359     Lock lock (_data->threadMutex);
360 
361     if ((size_t)count > _data->numThreads)
362     {
363     //
364         // Add more threads
365     //
366 
367         while (_data->numThreads < (size_t)count)
368         {
369             _data->threads.push_back (new WorkerThread (_data));
370             _data->numThreads++;
371         }
372     }
373     else if ((size_t)count < _data->numThreads)
374     {
375     //
376     // Wait until all existing threads are finished processing,
377     // then delete all threads.
378     //
379 
380         _data->finish ();
381 
382     //
383         // Add in new threads
384     //
385 
386         while (_data->numThreads < (size_t)count)
387         {
388             _data->threads.push_back (new WorkerThread (_data));
389             _data->numThreads++;
390         }
391     }
392 }
393 
394 
395 void
addTask(Task * task)396 ThreadPool::addTask (Task* task)
397 {
398     //
399     // Lock the threads, needed to access numThreads
400     //
401 
402     Lock lock (_data->threadMutex);
403 
404     if (_data->numThreads == 0)
405     {
406         task->execute ();
407         delete task;
408     }
409     else
410     {
411     //
412         // Get exclusive access to the tasks queue
413     //
414 
415         {
416             Lock taskLock (_data->taskMutex);
417 
418         //
419             // Push the new task into the FIFO
420         //
421 
422             _data->tasks.push_back (task);
423             _data->numTasks++;
424             task->group()->_data->addTask();
425         }
426 
427     //
428         // Signal that we have a new task to process
429     //
430 
431         _data->taskSemaphore.post ();
432     }
433 }
434 
435 
436 ThreadPool&
globalThreadPool()437 ThreadPool::globalThreadPool ()
438 {
439     //
440     // The global thread pool
441     //
442 
443     static ThreadPool gThreadPool (0);
444 
445     return gThreadPool;
446 }
447 
448 
449 void
addGlobalTask(Task * task)450 ThreadPool::addGlobalTask (Task* task)
451 {
452     globalThreadPool().addTask (task);
453 }
454 
455 
456 } // namespace IlmThread
457