1 /*M///////////////////////////////////////////////////////////////////////////////////////
2 //
3 //  IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.
4 //
5 //  By downloading, copying, installing or using the software you agree to this license.
6 //  If you do not agree to this license, do not download, install,
7 //  copy or use the software.
8 //
9 //
10 //                           License Agreement
11 //                For Open Source Computer Vision Library
12 //
13 // Copyright (C) 2000-2008, Intel Corporation, all rights reserved.
14 // Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved.
15 // Third party copyrights are property of their respective owners.
16 //
17 // Redistribution and use in source and binary forms, with or without modification,
18 // are permitted provided that the following conditions are met:
19 //
20 //   * Redistribution's of source code must retain the above copyright notice,
21 //     this list of conditions and the following disclaimer.
22 //
23 //   * Redistribution's in binary form must reproduce the above copyright notice,
24 //     this list of conditions and the following disclaimer in the documentation
25 //     and/or other materials provided with the distribution.
26 //
27 //   * The name of the copyright holders may not be used to endorse or promote products
28 //     derived from this software without specific prior written permission.
29 //
30 // This software is provided by the copyright holders and contributors "as is" and
31 // any express or implied warranties, including, but not limited to, the implied
32 // warranties of merchantability and fitness for a particular purpose are disclaimed.
33 // In no event shall the Intel Corporation or contributors be liable for any direct,
34 // indirect, incidental, special, exemplary, or consequential damages
35 // (including, but not limited to, procurement of substitute goods or services;
36 // loss of use, data, or profits; or business interruption) however caused
37 // and on any theory of liability, whether in contract, strict liability,
38 // or tort (including negligence or otherwise) arising in any way out of
39 // the use of this software, even if advised of the possibility of such damage.
40 //
41 //M*/
42 
43 #include "precomp.hpp"
44 
45 #if defined HAVE_PTHREADS && HAVE_PTHREADS
46 
47 #include <algorithm>
48 #include <pthread.h>
49 
50 namespace cv
51 {
52 
53 class ThreadManager;
54 
55 enum ForThreadState
56 {
57     eFTNotStarted = 0,
58     eFTStarted = 1,
59     eFTToStop = 2,
60     eFTStoped = 3
61 };
62 
63 enum ThreadManagerPoolState
64 {
65     eTMNotInited = 0,
66     eTMFailedToInit = 1,
67     eTMInited = 2,
68     eTMSingleThreaded = 3
69 };
70 
71 struct work_load
72 {
work_loadcv::work_load73     work_load()
74     {
75         clear();
76     }
77 
work_loadcv::work_load78     work_load(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes)
79     {
80         set(range, body, nstripes);
81     }
82 
setcv::work_load83     void set(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes)
84     {
85         m_body = &body;
86         m_range = &range;
87         m_nstripes = nstripes;
88         m_blocks_count = ((m_range->end - m_range->start - 1)/m_nstripes) + 1;
89     }
90 
91     const cv::ParallelLoopBody* m_body;
92     const cv::Range*            m_range;
93     int                         m_nstripes;
94     unsigned int                m_blocks_count;
95 
clearcv::work_load96     void clear()
97     {
98         m_body = 0;
99         m_range = 0;
100         m_nstripes = 0;
101         m_blocks_count = 0;
102     }
103 };
104 
105 class ForThread
106 {
107 public:
108 
ForThread()109     ForThread(): m_task_start(false), m_parent(0), m_state(eFTNotStarted), m_id(0)
110     {
111     }
112 
113     //called from manager thread
114     bool init(size_t id, ThreadManager* parent);
115 
116     //called from manager thread
117     void run();
118 
119     //called from manager thread
120     void stop();
121 
122     ~ForThread();
123 
124 private:
125 
126     //called from worker thread
127     static void* thread_loop_wrapper(void* thread_object);
128 
129     //called from worker thread
130     void execute();
131 
132     //called from worker thread
133     void thread_body();
134 
135     pthread_t       m_posix_thread;
136     pthread_mutex_t m_thread_mutex;
137     pthread_cond_t  m_cond_thread_task;
138     bool            m_task_start;
139 
140     ThreadManager*  m_parent;
141     ForThreadState  m_state;
142     size_t          m_id;
143 };
144 
145 class ThreadManager
146 {
147 public:
148     friend class ForThread;
149 
instance()150     static ThreadManager& instance()
151     {
152         if(!m_instance.ptr)
153         {
154             pthread_mutex_lock(&m_manager_access_mutex);
155 
156             if(!m_instance.ptr)
157             {
158                 m_instance.ptr = new ThreadManager();
159             }
160 
161             pthread_mutex_unlock(&m_manager_access_mutex);
162         }
163 
164         return *m_instance.ptr;
165     }
166 
167 
stop()168     static void stop()
169     {
170         ThreadManager& manager = instance();
171 
172         if(manager.m_pool_state == eTMInited)
173         {
174             for(size_t i = 0; i < manager.m_num_threads; ++i)
175             {
176                 manager.m_threads[i].stop();
177             }
178         }
179 
180         manager.m_pool_state = eTMNotInited;
181     }
182 
183     void run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
184 
185     size_t getNumOfThreads();
186 
187     void setNumOfThreads(size_t n);
188 
189 private:
190 
191     struct ptr_holder
192     {
193         ThreadManager* ptr;
194 
ptr_holdercv::ThreadManager::ptr_holder195         ptr_holder(): ptr(NULL) { }
196 
~ptr_holdercv::ThreadManager::ptr_holder197         ~ptr_holder()
198         {
199             if(ptr)
200             {
201                 delete ptr;
202             }
203         }
204     };
205 
206     ThreadManager();
207 
208     ~ThreadManager();
209 
210     void wait_complete();
211 
212     void notify_complete();
213 
214     bool initPool();
215 
216     size_t defaultNumberOfThreads();
217 
218     std::vector<ForThread> m_threads;
219     size_t m_num_threads;
220 
221     pthread_mutex_t m_manager_task_mutex;
222     pthread_cond_t  m_cond_thread_task_complete;
223     bool            m_task_complete;
224 
225     unsigned int m_task_position;
226     unsigned int m_num_of_completed_tasks;
227 
228     static pthread_mutex_t m_manager_access_mutex;
229     static ptr_holder m_instance;
230 
231     static const char m_env_name[];
232     static const unsigned int m_default_number_of_threads;
233 
234     work_load m_work_load;
235 
236     struct work_thread_t
237     {
work_thread_tcv::ThreadManager::work_thread_t238         work_thread_t(): value(false) { }
239         bool value;
240     };
241 
242     cv::TLSData<work_thread_t> m_is_work_thread;
243 
244     ThreadManagerPoolState m_pool_state;
245 };
246 
247 #ifndef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
248 #define PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP PTHREAD_RECURSIVE_MUTEX_INITIALIZER
249 #endif
250 
251 pthread_mutex_t ThreadManager::m_manager_access_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
252 
253 ThreadManager::ptr_holder ThreadManager::m_instance;
254 const char ThreadManager::m_env_name[] = "OPENCV_FOR_THREADS_NUM";
255 
256 #ifdef ANDROID
257 // many modern phones/tables have 4-core CPUs. Let's use no more
258 // than 2 threads by default not to overheat the devices
259 const unsigned int ThreadManager::m_default_number_of_threads = 2;
260 #else
261 const unsigned int ThreadManager::m_default_number_of_threads = 8;
262 #endif
263 
~ForThread()264 ForThread::~ForThread()
265 {
266     if(m_state == eFTStarted)
267     {
268         stop();
269 
270         pthread_mutex_destroy(&m_thread_mutex);
271 
272         pthread_cond_destroy(&m_cond_thread_task);
273     }
274 }
275 
init(size_t id,ThreadManager * parent)276 bool ForThread::init(size_t id, ThreadManager* parent)
277 {
278     m_id = id;
279 
280     m_parent = parent;
281 
282     int res = 0;
283 
284     res |= pthread_mutex_init(&m_thread_mutex, NULL);
285 
286     res |= pthread_cond_init(&m_cond_thread_task, NULL);
287 
288     if(!res)
289     {
290         res = pthread_create(&m_posix_thread, NULL, thread_loop_wrapper, (void*)this);
291     }
292 
293 
294     return res == 0;
295 }
296 
stop()297 void ForThread::stop()
298 {
299     if(m_state == eFTStarted)
300     {
301         m_state = eFTToStop;
302 
303         run();
304 
305         pthread_join(m_posix_thread, NULL);
306     }
307 
308     m_state = eFTStoped;
309 }
310 
run()311 void ForThread::run()
312 {
313     pthread_mutex_lock(&m_thread_mutex);
314 
315     m_task_start = true;
316 
317     pthread_cond_signal(&m_cond_thread_task);
318 
319     pthread_mutex_unlock(&m_thread_mutex);
320 }
321 
thread_loop_wrapper(void * thread_object)322 void* ForThread::thread_loop_wrapper(void* thread_object)
323 {
324     ((ForThread*)thread_object)->thread_body();
325     return 0;
326 }
327 
execute()328 void ForThread::execute()
329 {
330     unsigned int m_current_pos = CV_XADD(&m_parent->m_task_position, 1);
331 
332     work_load& load = m_parent->m_work_load;
333 
334     while(m_current_pos < load.m_blocks_count)
335     {
336         int start = load.m_range->start + m_current_pos*load.m_nstripes;
337         int end = std::min(start + load.m_nstripes, load.m_range->end);
338 
339         load.m_body->operator()(cv::Range(start, end));
340 
341         m_current_pos = CV_XADD(&m_parent->m_task_position, 1);
342     }
343 }
344 
thread_body()345 void ForThread::thread_body()
346 {
347     m_parent->m_is_work_thread.get()->value = true;
348 
349     pthread_mutex_lock(&m_thread_mutex);
350 
351     m_state = eFTStarted;
352 
353     while(m_state == eFTStarted)
354     {
355         //to handle spurious wakeups
356         while( !m_task_start && m_state != eFTToStop )
357             pthread_cond_wait(&m_cond_thread_task, &m_thread_mutex);
358 
359         if(m_state == eFTStarted)
360         {
361             execute();
362 
363             m_task_start = false;
364 
365             m_parent->notify_complete();
366         }
367     }
368 
369     pthread_mutex_unlock(&m_thread_mutex);
370 }
371 
ThreadManager()372 ThreadManager::ThreadManager(): m_num_threads(0), m_task_complete(false), m_num_of_completed_tasks(0), m_pool_state(eTMNotInited)
373 {
374     int res = 0;
375 
376     res |= pthread_mutex_init(&m_manager_task_mutex, NULL);
377 
378     res |= pthread_cond_init(&m_cond_thread_task_complete, NULL);
379 
380     if(!res)
381     {
382         setNumOfThreads(defaultNumberOfThreads());
383 
384         m_task_position = 0;
385     }
386     else
387     {
388         m_num_threads = 1;
389         m_pool_state = eTMFailedToInit;
390         m_task_position = 0;
391 
392         //print error;
393     }
394 }
395 
~ThreadManager()396 ThreadManager::~ThreadManager()
397 {
398     stop();
399 
400     pthread_mutex_destroy(&m_manager_task_mutex);
401 
402     pthread_cond_destroy(&m_cond_thread_task_complete);
403 
404     pthread_mutex_destroy(&m_manager_access_mutex);
405 }
406 
run(const cv::Range & range,const cv::ParallelLoopBody & body,double nstripes)407 void ThreadManager::run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
408 {
409     bool is_work_thread = m_is_work_thread.get()->value;
410 
411     if( (getNumOfThreads() > 1) && !is_work_thread &&
412         (range.end - range.start > 1) && (nstripes <= 0 || nstripes >= 1.5) )
413     {
414         int res = pthread_mutex_trylock(&m_manager_access_mutex);
415 
416         if(!res)
417         {
418             if(initPool())
419             {
420                 double min_stripes = double(range.end - range.start)/(4*m_threads.size());
421 
422                 nstripes = std::max(nstripes, min_stripes);
423 
424                 pthread_mutex_lock(&m_manager_task_mutex);
425 
426                 m_num_of_completed_tasks = 0;
427 
428                 m_task_position = 0;
429 
430                 m_task_complete = false;
431 
432                 m_work_load.set(range, body, std::ceil(nstripes));
433 
434                 for(size_t i = 0; i < m_threads.size(); ++i)
435                 {
436                     m_threads[i].run();
437                 }
438 
439                 wait_complete();
440             }
441             else
442             {
443                 //print error
444                 body(range);
445             }
446         }
447         else
448         {
449             body(range);
450         }
451     }
452     else
453     {
454         body(range);
455     }
456 }
457 
wait_complete()458 void ThreadManager::wait_complete()
459 {
460     //to handle spurious wakeups
461     while(!m_task_complete)
462         pthread_cond_wait(&m_cond_thread_task_complete, &m_manager_task_mutex);
463 
464     pthread_mutex_unlock(&m_manager_task_mutex);
465 
466     pthread_mutex_unlock(&m_manager_access_mutex);
467 }
468 
notify_complete()469 void ThreadManager::notify_complete()
470 {
471 
472     unsigned int comp = CV_XADD(&m_num_of_completed_tasks, 1);
473 
474     if(comp == (m_num_threads - 1))
475     {
476         pthread_mutex_lock(&m_manager_task_mutex);
477 
478         m_task_complete = true;
479 
480         pthread_cond_signal(&m_cond_thread_task_complete);
481 
482         pthread_mutex_unlock(&m_manager_task_mutex);
483     }
484 }
485 
initPool()486 bool ThreadManager::initPool()
487 {
488     if(m_pool_state != eTMNotInited || m_num_threads == 1)
489         return true;
490 
491     m_threads.resize(m_num_threads);
492 
493     bool res = true;
494 
495     for(size_t i = 0; i < m_threads.size(); ++i)
496     {
497         res |= m_threads[i].init(i, this);
498     }
499 
500     if(res)
501     {
502         m_pool_state = eTMInited;
503     }
504     else
505     {
506         //TODO: join threads?
507         m_pool_state = eTMFailedToInit;
508     }
509 
510     return res;
511 }
512 
getNumOfThreads()513 size_t ThreadManager::getNumOfThreads()
514 {
515     return m_num_threads;
516 }
517 
setNumOfThreads(size_t n)518 void ThreadManager::setNumOfThreads(size_t n)
519 {
520     int res = pthread_mutex_lock(&m_manager_access_mutex);
521 
522     if(!res)
523     {
524         if(n == 0)
525         {
526             n = defaultNumberOfThreads();
527         }
528 
529         if(n != m_num_threads && m_pool_state != eTMFailedToInit)
530         {
531             if(m_pool_state == eTMInited)
532             {
533                 stop();
534                 m_threads.clear();
535             }
536 
537             m_num_threads = n;
538 
539             if(m_num_threads == 1)
540             {
541                 m_pool_state = eTMSingleThreaded;
542             }
543             else
544             {
545                 m_pool_state = eTMNotInited;
546             }
547         }
548 
549         pthread_mutex_unlock(&m_manager_access_mutex);
550     }
551 }
552 
defaultNumberOfThreads()553 size_t ThreadManager::defaultNumberOfThreads()
554 {
555     unsigned int result = m_default_number_of_threads;
556 
557     char * env = getenv(m_env_name);
558 
559     if(env != NULL)
560     {
561         sscanf(env, "%u", &result);
562 
563         result = std::max(1u, result);
564         //do we need upper limit of threads number?
565     }
566 
567     return result;
568 }
569 
570 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
571 size_t parallel_pthreads_get_threads_num();
572 void parallel_pthreads_set_threads_num(int num);
573 
parallel_pthreads_get_threads_num()574 size_t parallel_pthreads_get_threads_num()
575 {
576     return ThreadManager::instance().getNumOfThreads();
577 }
578 
parallel_pthreads_set_threads_num(int num)579 void parallel_pthreads_set_threads_num(int num)
580 {
581     if(num < 0)
582     {
583         ThreadManager::instance().setNumOfThreads(0);
584     }
585     else
586     {
587         ThreadManager::instance().setNumOfThreads(size_t(num));
588     }
589 }
590 
parallel_for_pthreads(const cv::Range & range,const cv::ParallelLoopBody & body,double nstripes)591 void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
592 {
593     ThreadManager::instance().run(range, body, nstripes);
594 }
595 
596 }
597 
598 #endif
599