1 /*M///////////////////////////////////////////////////////////////////////////////////////
2 //
3 // IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.
4 //
5 // By downloading, copying, installing or using the software you agree to this license.
6 // If you do not agree to this license, do not download, install,
7 // copy or use the software.
8 //
9 //
10 // License Agreement
11 // For Open Source Computer Vision Library
12 //
13 // Copyright (C) 2000-2008, Intel Corporation, all rights reserved.
14 // Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved.
15 // Third party copyrights are property of their respective owners.
16 //
17 // Redistribution and use in source and binary forms, with or without modification,
18 // are permitted provided that the following conditions are met:
19 //
20 // * Redistribution's of source code must retain the above copyright notice,
21 // this list of conditions and the following disclaimer.
22 //
23 // * Redistribution's in binary form must reproduce the above copyright notice,
24 // this list of conditions and the following disclaimer in the documentation
25 // and/or other materials provided with the distribution.
26 //
27 // * The name of the copyright holders may not be used to endorse or promote products
28 // derived from this software without specific prior written permission.
29 //
30 // This software is provided by the copyright holders and contributors "as is" and
31 // any express or implied warranties, including, but not limited to, the implied
32 // warranties of merchantability and fitness for a particular purpose are disclaimed.
33 // In no event shall the Intel Corporation or contributors be liable for any direct,
34 // indirect, incidental, special, exemplary, or consequential damages
35 // (including, but not limited to, procurement of substitute goods or services;
36 // loss of use, data, or profits; or business interruption) however caused
37 // and on any theory of liability, whether in contract, strict liability,
38 // or tort (including negligence or otherwise) arising in any way out of
39 // the use of this software, even if advised of the possibility of such damage.
40 //
41 //M*/
42
43 #include "precomp.hpp"
44
45 #if defined HAVE_PTHREADS && HAVE_PTHREADS
46
47 #include <algorithm>
48 #include <pthread.h>
49
50 namespace cv
51 {
52
53 class ThreadManager;
54
55 enum ForThreadState
56 {
57 eFTNotStarted = 0,
58 eFTStarted = 1,
59 eFTToStop = 2,
60 eFTStoped = 3
61 };
62
63 enum ThreadManagerPoolState
64 {
65 eTMNotInited = 0,
66 eTMFailedToInit = 1,
67 eTMInited = 2,
68 eTMSingleThreaded = 3
69 };
70
71 struct work_load
72 {
work_loadcv::work_load73 work_load()
74 {
75 clear();
76 }
77
work_loadcv::work_load78 work_load(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes)
79 {
80 set(range, body, nstripes);
81 }
82
setcv::work_load83 void set(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes)
84 {
85 m_body = &body;
86 m_range = ⦥
87 m_nstripes = nstripes;
88 m_blocks_count = ((m_range->end - m_range->start - 1)/m_nstripes) + 1;
89 }
90
91 const cv::ParallelLoopBody* m_body;
92 const cv::Range* m_range;
93 int m_nstripes;
94 unsigned int m_blocks_count;
95
clearcv::work_load96 void clear()
97 {
98 m_body = 0;
99 m_range = 0;
100 m_nstripes = 0;
101 m_blocks_count = 0;
102 }
103 };
104
105 class ForThread
106 {
107 public:
108
ForThread()109 ForThread(): m_task_start(false), m_parent(0), m_state(eFTNotStarted), m_id(0)
110 {
111 }
112
113 //called from manager thread
114 bool init(size_t id, ThreadManager* parent);
115
116 //called from manager thread
117 void run();
118
119 //called from manager thread
120 void stop();
121
122 ~ForThread();
123
124 private:
125
126 //called from worker thread
127 static void* thread_loop_wrapper(void* thread_object);
128
129 //called from worker thread
130 void execute();
131
132 //called from worker thread
133 void thread_body();
134
135 pthread_t m_posix_thread;
136 pthread_mutex_t m_thread_mutex;
137 pthread_cond_t m_cond_thread_task;
138 bool m_task_start;
139
140 ThreadManager* m_parent;
141 ForThreadState m_state;
142 size_t m_id;
143 };
144
145 class ThreadManager
146 {
147 public:
148 friend class ForThread;
149
instance()150 static ThreadManager& instance()
151 {
152 if(!m_instance.ptr)
153 {
154 pthread_mutex_lock(&m_manager_access_mutex);
155
156 if(!m_instance.ptr)
157 {
158 m_instance.ptr = new ThreadManager();
159 }
160
161 pthread_mutex_unlock(&m_manager_access_mutex);
162 }
163
164 return *m_instance.ptr;
165 }
166
167
stop()168 static void stop()
169 {
170 ThreadManager& manager = instance();
171
172 if(manager.m_pool_state == eTMInited)
173 {
174 for(size_t i = 0; i < manager.m_num_threads; ++i)
175 {
176 manager.m_threads[i].stop();
177 }
178 }
179
180 manager.m_pool_state = eTMNotInited;
181 }
182
183 void run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
184
185 size_t getNumOfThreads();
186
187 void setNumOfThreads(size_t n);
188
189 private:
190
191 struct ptr_holder
192 {
193 ThreadManager* ptr;
194
ptr_holdercv::ThreadManager::ptr_holder195 ptr_holder(): ptr(NULL) { }
196
~ptr_holdercv::ThreadManager::ptr_holder197 ~ptr_holder()
198 {
199 if(ptr)
200 {
201 delete ptr;
202 }
203 }
204 };
205
206 ThreadManager();
207
208 ~ThreadManager();
209
210 void wait_complete();
211
212 void notify_complete();
213
214 bool initPool();
215
216 size_t defaultNumberOfThreads();
217
218 std::vector<ForThread> m_threads;
219 size_t m_num_threads;
220
221 pthread_mutex_t m_manager_task_mutex;
222 pthread_cond_t m_cond_thread_task_complete;
223 bool m_task_complete;
224
225 unsigned int m_task_position;
226 unsigned int m_num_of_completed_tasks;
227
228 static pthread_mutex_t m_manager_access_mutex;
229 static ptr_holder m_instance;
230
231 static const char m_env_name[];
232 static const unsigned int m_default_number_of_threads;
233
234 work_load m_work_load;
235
236 struct work_thread_t
237 {
work_thread_tcv::ThreadManager::work_thread_t238 work_thread_t(): value(false) { }
239 bool value;
240 };
241
242 cv::TLSData<work_thread_t> m_is_work_thread;
243
244 ThreadManagerPoolState m_pool_state;
245 };
246
247 #ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
248 #define PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP PTHREAD_RECURSIVE_MUTEX_INITIALIZER
249 #endif
250
251 pthread_mutex_t ThreadManager::m_manager_access_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
252
253 ThreadManager::ptr_holder ThreadManager::m_instance;
254 const char ThreadManager::m_env_name[] = "OPENCV_FOR_THREADS_NUM";
255
256 #ifdef ANDROID
257 // many modern phones/tables have 4-core CPUs. Let's use no more
258 // than 2 threads by default not to overheat the devices
259 const unsigned int ThreadManager::m_default_number_of_threads = 2;
260 #else
261 const unsigned int ThreadManager::m_default_number_of_threads = 8;
262 #endif
263
~ForThread()264 ForThread::~ForThread()
265 {
266 if(m_state == eFTStarted)
267 {
268 stop();
269
270 pthread_mutex_destroy(&m_thread_mutex);
271
272 pthread_cond_destroy(&m_cond_thread_task);
273 }
274 }
275
init(size_t id,ThreadManager * parent)276 bool ForThread::init(size_t id, ThreadManager* parent)
277 {
278 m_id = id;
279
280 m_parent = parent;
281
282 int res = 0;
283
284 res |= pthread_mutex_init(&m_thread_mutex, NULL);
285
286 res |= pthread_cond_init(&m_cond_thread_task, NULL);
287
288 if(!res)
289 {
290 res = pthread_create(&m_posix_thread, NULL, thread_loop_wrapper, (void*)this);
291 }
292
293
294 return res == 0;
295 }
296
stop()297 void ForThread::stop()
298 {
299 if(m_state == eFTStarted)
300 {
301 m_state = eFTToStop;
302
303 run();
304
305 pthread_join(m_posix_thread, NULL);
306 }
307
308 m_state = eFTStoped;
309 }
310
run()311 void ForThread::run()
312 {
313 pthread_mutex_lock(&m_thread_mutex);
314
315 m_task_start = true;
316
317 pthread_cond_signal(&m_cond_thread_task);
318
319 pthread_mutex_unlock(&m_thread_mutex);
320 }
321
thread_loop_wrapper(void * thread_object)322 void* ForThread::thread_loop_wrapper(void* thread_object)
323 {
324 ((ForThread*)thread_object)->thread_body();
325 return 0;
326 }
327
execute()328 void ForThread::execute()
329 {
330 unsigned int m_current_pos = CV_XADD(&m_parent->m_task_position, 1);
331
332 work_load& load = m_parent->m_work_load;
333
334 while(m_current_pos < load.m_blocks_count)
335 {
336 int start = load.m_range->start + m_current_pos*load.m_nstripes;
337 int end = std::min(start + load.m_nstripes, load.m_range->end);
338
339 load.m_body->operator()(cv::Range(start, end));
340
341 m_current_pos = CV_XADD(&m_parent->m_task_position, 1);
342 }
343 }
344
thread_body()345 void ForThread::thread_body()
346 {
347 m_parent->m_is_work_thread.get()->value = true;
348
349 pthread_mutex_lock(&m_thread_mutex);
350
351 m_state = eFTStarted;
352
353 while(m_state == eFTStarted)
354 {
355 //to handle spurious wakeups
356 while( !m_task_start && m_state != eFTToStop )
357 pthread_cond_wait(&m_cond_thread_task, &m_thread_mutex);
358
359 if(m_state == eFTStarted)
360 {
361 execute();
362
363 m_task_start = false;
364
365 m_parent->notify_complete();
366 }
367 }
368
369 pthread_mutex_unlock(&m_thread_mutex);
370 }
371
ThreadManager()372 ThreadManager::ThreadManager(): m_num_threads(0), m_task_complete(false), m_num_of_completed_tasks(0), m_pool_state(eTMNotInited)
373 {
374 int res = 0;
375
376 res |= pthread_mutex_init(&m_manager_task_mutex, NULL);
377
378 res |= pthread_cond_init(&m_cond_thread_task_complete, NULL);
379
380 if(!res)
381 {
382 setNumOfThreads(defaultNumberOfThreads());
383
384 m_task_position = 0;
385 }
386 else
387 {
388 m_num_threads = 1;
389 m_pool_state = eTMFailedToInit;
390 m_task_position = 0;
391
392 //print error;
393 }
394 }
395
~ThreadManager()396 ThreadManager::~ThreadManager()
397 {
398 stop();
399
400 pthread_mutex_destroy(&m_manager_task_mutex);
401
402 pthread_cond_destroy(&m_cond_thread_task_complete);
403
404 pthread_mutex_destroy(&m_manager_access_mutex);
405 }
406
run(const cv::Range & range,const cv::ParallelLoopBody & body,double nstripes)407 void ThreadManager::run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
408 {
409 bool is_work_thread = m_is_work_thread.get()->value;
410
411 if( (getNumOfThreads() > 1) && !is_work_thread &&
412 (range.end - range.start > 1) && (nstripes <= 0 || nstripes >= 1.5) )
413 {
414 int res = pthread_mutex_trylock(&m_manager_access_mutex);
415
416 if(!res)
417 {
418 if(initPool())
419 {
420 double min_stripes = double(range.end - range.start)/(4*m_threads.size());
421
422 nstripes = std::max(nstripes, min_stripes);
423
424 pthread_mutex_lock(&m_manager_task_mutex);
425
426 m_num_of_completed_tasks = 0;
427
428 m_task_position = 0;
429
430 m_task_complete = false;
431
432 m_work_load.set(range, body, std::ceil(nstripes));
433
434 for(size_t i = 0; i < m_threads.size(); ++i)
435 {
436 m_threads[i].run();
437 }
438
439 wait_complete();
440 }
441 else
442 {
443 //print error
444 body(range);
445 }
446 }
447 else
448 {
449 body(range);
450 }
451 }
452 else
453 {
454 body(range);
455 }
456 }
457
wait_complete()458 void ThreadManager::wait_complete()
459 {
460 //to handle spurious wakeups
461 while(!m_task_complete)
462 pthread_cond_wait(&m_cond_thread_task_complete, &m_manager_task_mutex);
463
464 pthread_mutex_unlock(&m_manager_task_mutex);
465
466 pthread_mutex_unlock(&m_manager_access_mutex);
467 }
468
notify_complete()469 void ThreadManager::notify_complete()
470 {
471
472 unsigned int comp = CV_XADD(&m_num_of_completed_tasks, 1);
473
474 if(comp == (m_num_threads - 1))
475 {
476 pthread_mutex_lock(&m_manager_task_mutex);
477
478 m_task_complete = true;
479
480 pthread_cond_signal(&m_cond_thread_task_complete);
481
482 pthread_mutex_unlock(&m_manager_task_mutex);
483 }
484 }
485
initPool()486 bool ThreadManager::initPool()
487 {
488 if(m_pool_state != eTMNotInited || m_num_threads == 1)
489 return true;
490
491 m_threads.resize(m_num_threads);
492
493 bool res = true;
494
495 for(size_t i = 0; i < m_threads.size(); ++i)
496 {
497 res |= m_threads[i].init(i, this);
498 }
499
500 if(res)
501 {
502 m_pool_state = eTMInited;
503 }
504 else
505 {
506 //TODO: join threads?
507 m_pool_state = eTMFailedToInit;
508 }
509
510 return res;
511 }
512
getNumOfThreads()513 size_t ThreadManager::getNumOfThreads()
514 {
515 return m_num_threads;
516 }
517
setNumOfThreads(size_t n)518 void ThreadManager::setNumOfThreads(size_t n)
519 {
520 int res = pthread_mutex_lock(&m_manager_access_mutex);
521
522 if(!res)
523 {
524 if(n == 0)
525 {
526 n = defaultNumberOfThreads();
527 }
528
529 if(n != m_num_threads && m_pool_state != eTMFailedToInit)
530 {
531 if(m_pool_state == eTMInited)
532 {
533 stop();
534 m_threads.clear();
535 }
536
537 m_num_threads = n;
538
539 if(m_num_threads == 1)
540 {
541 m_pool_state = eTMSingleThreaded;
542 }
543 else
544 {
545 m_pool_state = eTMNotInited;
546 }
547 }
548
549 pthread_mutex_unlock(&m_manager_access_mutex);
550 }
551 }
552
defaultNumberOfThreads()553 size_t ThreadManager::defaultNumberOfThreads()
554 {
555 unsigned int result = m_default_number_of_threads;
556
557 char * env = getenv(m_env_name);
558
559 if(env != NULL)
560 {
561 sscanf(env, "%u", &result);
562
563 result = std::max(1u, result);
564 //do we need upper limit of threads number?
565 }
566
567 return result;
568 }
569
570 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
571 size_t parallel_pthreads_get_threads_num();
572 void parallel_pthreads_set_threads_num(int num);
573
parallel_pthreads_get_threads_num()574 size_t parallel_pthreads_get_threads_num()
575 {
576 return ThreadManager::instance().getNumOfThreads();
577 }
578
parallel_pthreads_set_threads_num(int num)579 void parallel_pthreads_set_threads_num(int num)
580 {
581 if(num < 0)
582 {
583 ThreadManager::instance().setNumOfThreads(0);
584 }
585 else
586 {
587 ThreadManager::instance().setNumOfThreads(size_t(num));
588 }
589 }
590
parallel_for_pthreads(const cv::Range & range,const cv::ParallelLoopBody & body,double nstripes)591 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
592 {
593 ThreadManager::instance().run(range, body, nstripes);
594 }
595
596 }
597
598 #endif
599