1 /*
2  * The copyright in this software is being made available under the 2-clauses
3  * BSD License, included below. This software may be subject to other third
4  * party and contributor rights, including patent rights, and no such rights
5  * are granted under this license.
6  *
7  * Copyright (c) 2016, Even Rouault
8  * All rights reserved.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 #include <assert.h>
33 
34 #ifdef MUTEX_win32
35 
36 /* Some versions of x86_64-w64-mingw32-gc -m32 resolve InterlockedCompareExchange() */
37 /* as __sync_val_compare_and_swap_4 but fails to link it. As this protects against */
38 /* a rather unlikely race, skip it */
39 #if !(defined(__MINGW32__) && defined(__i386__))
40 #define HAVE_INTERLOCKED_COMPARE_EXCHANGE 1
41 #endif
42 
43 #include <windows.h>
44 #include <process.h>
45 
46 #include "opj_includes.h"
47 
opj_has_thread_support(void)48 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
49 {
50     return OPJ_TRUE;
51 }
52 
opj_get_num_cpus(void)53 int OPJ_CALLCONV opj_get_num_cpus(void)
54 {
55     SYSTEM_INFO info;
56     DWORD dwNum;
57     GetSystemInfo(&info);
58     dwNum = info.dwNumberOfProcessors;
59     if (dwNum < 1) {
60         return 1;
61     }
62     return (int)dwNum;
63 }
64 
65 struct opj_mutex_t {
66     CRITICAL_SECTION cs;
67 };
68 
opj_mutex_create(void)69 opj_mutex_t* opj_mutex_create(void)
70 {
71     opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t));
72     if (!mutex) {
73         return NULL;
74     }
75     InitializeCriticalSectionAndSpinCount(&(mutex->cs), 4000);
76     return mutex;
77 }
78 
opj_mutex_lock(opj_mutex_t * mutex)79 void opj_mutex_lock(opj_mutex_t* mutex)
80 {
81     EnterCriticalSection(&(mutex->cs));
82 }
83 
opj_mutex_unlock(opj_mutex_t * mutex)84 void opj_mutex_unlock(opj_mutex_t* mutex)
85 {
86     LeaveCriticalSection(&(mutex->cs));
87 }
88 
opj_mutex_destroy(opj_mutex_t * mutex)89 void opj_mutex_destroy(opj_mutex_t* mutex)
90 {
91     if (!mutex) {
92         return;
93     }
94     DeleteCriticalSection(&(mutex->cs));
95     opj_free(mutex);
96 }
97 
98 struct opj_cond_waiter_list_t {
99     HANDLE hEvent;
100     struct opj_cond_waiter_list_t* next;
101 };
102 typedef struct opj_cond_waiter_list_t opj_cond_waiter_list_t;
103 
104 struct opj_cond_t {
105     opj_mutex_t             *internal_mutex;
106     opj_cond_waiter_list_t  *waiter_list;
107 };
108 
109 static DWORD TLSKey = 0;
110 static volatile LONG inTLSLockedSection = 0;
111 static volatile int TLSKeyInit = OPJ_FALSE;
112 
opj_cond_create(void)113 opj_cond_t* opj_cond_create(void)
114 {
115     opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
116     if (!cond) {
117         return NULL;
118     }
119 
120     /* Make sure that the TLS key is allocated in a thread-safe way */
121     /* We cannot use a global mutex/critical section since its creation itself would not be */
122     /* thread-safe, so use InterlockedCompareExchange trick */
123     while (OPJ_TRUE) {
124 
125 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
126         if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0)
127 #endif
128         {
129             if (!TLSKeyInit) {
130                 TLSKey = TlsAlloc();
131                 TLSKeyInit = OPJ_TRUE;
132             }
133 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
134             InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
135 #endif
136             break;
137         }
138     }
139 
140     if (TLSKey == TLS_OUT_OF_INDEXES) {
141         opj_free(cond);
142         return NULL;
143     }
144     cond->internal_mutex = opj_mutex_create();
145     if (cond->internal_mutex == NULL) {
146         opj_free(cond);
147         return NULL;
148     }
149     cond->waiter_list = NULL;
150     return cond;
151 }
152 
opj_cond_wait(opj_cond_t * cond,opj_mutex_t * mutex)153 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
154 {
155     opj_cond_waiter_list_t* item;
156     HANDLE hEvent = (HANDLE) TlsGetValue(TLSKey);
157     if (hEvent == NULL) {
158         hEvent = CreateEvent(NULL, /* security attributes */
159                              0,    /* manual reset = no */
160                              0,    /* initial state = unsignaled */
161                              NULL  /* no name */);
162         assert(hEvent);
163 
164         TlsSetValue(TLSKey, hEvent);
165     }
166 
167     /* Insert the waiter into the waiter list of the condition */
168     opj_mutex_lock(cond->internal_mutex);
169 
170     item = (opj_cond_waiter_list_t*)opj_malloc(sizeof(opj_cond_waiter_list_t));
171     assert(item != NULL);
172 
173     item->hEvent = hEvent;
174     item->next = cond->waiter_list;
175 
176     cond->waiter_list = item;
177 
178     opj_mutex_unlock(cond->internal_mutex);
179 
180     /* Release the client mutex before waiting for the event being signaled */
181     opj_mutex_unlock(mutex);
182 
183     /* Ideally we would check that we do not get WAIT_FAILED but it is hard */
184     /* to report a failure. */
185     WaitForSingleObject(hEvent, INFINITE);
186 
187     /* Reacquire the client mutex */
188     opj_mutex_lock(mutex);
189 }
190 
opj_cond_signal(opj_cond_t * cond)191 void opj_cond_signal(opj_cond_t* cond)
192 {
193     opj_cond_waiter_list_t* psIter;
194 
195     /* Signal the first registered event, and remove it from the list */
196     opj_mutex_lock(cond->internal_mutex);
197 
198     psIter = cond->waiter_list;
199     if (psIter != NULL) {
200         SetEvent(psIter->hEvent);
201         cond->waiter_list = psIter->next;
202         opj_free(psIter);
203     }
204 
205     opj_mutex_unlock(cond->internal_mutex);
206 }
207 
opj_cond_destroy(opj_cond_t * cond)208 void opj_cond_destroy(opj_cond_t* cond)
209 {
210     if (!cond) {
211         return;
212     }
213     opj_mutex_destroy(cond->internal_mutex);
214     assert(cond->waiter_list == NULL);
215     opj_free(cond);
216 }
217 
218 struct opj_thread_t {
219     opj_thread_fn thread_fn;
220     void* user_data;
221     HANDLE hThread;
222 };
223 
opj_thread_callback_adapter(void * info)224 unsigned int __stdcall opj_thread_callback_adapter(void *info)
225 {
226     opj_thread_t* thread = (opj_thread_t*) info;
227     HANDLE hEvent = NULL;
228 
229     thread->thread_fn(thread->user_data);
230 
231     /* Free the handle possible allocated by a cond */
232     while (OPJ_TRUE) {
233         /* Make sure TLSKey is not being created just at that moment... */
234 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
235         if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0)
236 #endif
237         {
238             if (TLSKeyInit) {
239                 hEvent = (HANDLE) TlsGetValue(TLSKey);
240             }
241 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
242             InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
243 #endif
244             break;
245         }
246     }
247     if (hEvent) {
248         CloseHandle(hEvent);
249     }
250 
251     return 0;
252 }
253 
opj_thread_create(opj_thread_fn thread_fn,void * user_data)254 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
255 {
256     opj_thread_t* thread;
257 
258     assert(thread_fn);
259 
260     thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t));
261     if (!thread) {
262         return NULL;
263     }
264     thread->thread_fn = thread_fn;
265     thread->user_data = user_data;
266 
267     thread->hThread = (HANDLE)_beginthreadex(NULL, 0,
268                       opj_thread_callback_adapter, thread, 0, NULL);
269 
270     if (thread->hThread == NULL) {
271         opj_free(thread);
272         return NULL;
273     }
274     return thread;
275 }
276 
opj_thread_join(opj_thread_t * thread)277 void opj_thread_join(opj_thread_t* thread)
278 {
279     WaitForSingleObject(thread->hThread, INFINITE);
280     CloseHandle(thread->hThread);
281 
282     opj_free(thread);
283 }
284 
285 #elif MUTEX_pthread
286 
287 #include <pthread.h>
288 #include <stdlib.h>
289 #include <unistd.h>
290 
291 /* Moved after all system includes, and in particular pthread.h, so as to */
292 /* avoid poisoning issuing with malloc() use in pthread.h with ulibc (#1013) */
293 #include "opj_includes.h"
294 
opj_has_thread_support(void)295 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
296 {
297     return OPJ_TRUE;
298 }
299 
opj_get_num_cpus(void)300 int OPJ_CALLCONV opj_get_num_cpus(void)
301 {
302 #ifdef _SC_NPROCESSORS_ONLN
303     return (int)sysconf(_SC_NPROCESSORS_ONLN);
304 #else
305     return 1;
306 #endif
307 }
308 
309 struct opj_mutex_t {
310     pthread_mutex_t mutex;
311 };
312 
opj_mutex_create(void)313 opj_mutex_t* opj_mutex_create(void)
314 {
315     opj_mutex_t* mutex = (opj_mutex_t*) opj_calloc(1U, sizeof(opj_mutex_t));
316     if (mutex != NULL) {
317         if (pthread_mutex_init(&mutex->mutex, NULL) != 0) {
318             opj_free(mutex);
319             mutex = NULL;
320         }
321     }
322     return mutex;
323 }
324 
opj_mutex_lock(opj_mutex_t * mutex)325 void opj_mutex_lock(opj_mutex_t* mutex)
326 {
327     pthread_mutex_lock(&(mutex->mutex));
328 }
329 
opj_mutex_unlock(opj_mutex_t * mutex)330 void opj_mutex_unlock(opj_mutex_t* mutex)
331 {
332     pthread_mutex_unlock(&(mutex->mutex));
333 }
334 
opj_mutex_destroy(opj_mutex_t * mutex)335 void opj_mutex_destroy(opj_mutex_t* mutex)
336 {
337     if (!mutex) {
338         return;
339     }
340     pthread_mutex_destroy(&(mutex->mutex));
341     opj_free(mutex);
342 }
343 
344 struct opj_cond_t {
345     pthread_cond_t cond;
346 };
347 
opj_cond_create(void)348 opj_cond_t* opj_cond_create(void)
349 {
350     opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
351     if (!cond) {
352         return NULL;
353     }
354     if (pthread_cond_init(&(cond->cond), NULL) != 0) {
355         opj_free(cond);
356         return NULL;
357     }
358     return cond;
359 }
360 
opj_cond_wait(opj_cond_t * cond,opj_mutex_t * mutex)361 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
362 {
363     pthread_cond_wait(&(cond->cond), &(mutex->mutex));
364 }
365 
opj_cond_signal(opj_cond_t * cond)366 void opj_cond_signal(opj_cond_t* cond)
367 {
368     int ret = pthread_cond_signal(&(cond->cond));
369     (void)ret;
370     assert(ret == 0);
371 }
372 
opj_cond_destroy(opj_cond_t * cond)373 void opj_cond_destroy(opj_cond_t* cond)
374 {
375     if (!cond) {
376         return;
377     }
378     pthread_cond_destroy(&(cond->cond));
379     opj_free(cond);
380 }
381 
382 
383 struct opj_thread_t {
384     opj_thread_fn thread_fn;
385     void* user_data;
386     pthread_t thread;
387 };
388 
opj_thread_callback_adapter(void * info)389 static void* opj_thread_callback_adapter(void* info)
390 {
391     opj_thread_t* thread = (opj_thread_t*) info;
392     thread->thread_fn(thread->user_data);
393     return NULL;
394 }
395 
opj_thread_create(opj_thread_fn thread_fn,void * user_data)396 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
397 {
398     pthread_attr_t attr;
399     opj_thread_t* thread;
400 
401     assert(thread_fn);
402 
403     thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t));
404     if (!thread) {
405         return NULL;
406     }
407     thread->thread_fn = thread_fn;
408     thread->user_data = user_data;
409 
410     pthread_attr_init(&attr);
411     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
412     if (pthread_create(&(thread->thread), &attr,
413                        opj_thread_callback_adapter, (void *) thread) != 0) {
414         opj_free(thread);
415         return NULL;
416     }
417     return thread;
418 }
419 
opj_thread_join(opj_thread_t * thread)420 void opj_thread_join(opj_thread_t* thread)
421 {
422     void* status;
423     pthread_join(thread->thread, &status);
424 
425     opj_free(thread);
426 }
427 
428 #else
429 /* Stub implementation */
430 
431 #include "opj_includes.h"
432 
opj_has_thread_support(void)433 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
434 {
435     return OPJ_FALSE;
436 }
437 
opj_get_num_cpus(void)438 int OPJ_CALLCONV opj_get_num_cpus(void)
439 {
440     return 1;
441 }
442 
opj_mutex_create(void)443 opj_mutex_t* opj_mutex_create(void)
444 {
445     return NULL;
446 }
447 
opj_mutex_lock(opj_mutex_t * mutex)448 void opj_mutex_lock(opj_mutex_t* mutex)
449 {
450     (void) mutex;
451 }
452 
opj_mutex_unlock(opj_mutex_t * mutex)453 void opj_mutex_unlock(opj_mutex_t* mutex)
454 {
455     (void) mutex;
456 }
457 
opj_mutex_destroy(opj_mutex_t * mutex)458 void opj_mutex_destroy(opj_mutex_t* mutex)
459 {
460     (void) mutex;
461 }
462 
opj_cond_create(void)463 opj_cond_t* opj_cond_create(void)
464 {
465     return NULL;
466 }
467 
opj_cond_wait(opj_cond_t * cond,opj_mutex_t * mutex)468 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
469 {
470     (void) cond;
471     (void) mutex;
472 }
473 
opj_cond_signal(opj_cond_t * cond)474 void opj_cond_signal(opj_cond_t* cond)
475 {
476     (void) cond;
477 }
478 
opj_cond_destroy(opj_cond_t * cond)479 void opj_cond_destroy(opj_cond_t* cond)
480 {
481     (void) cond;
482 }
483 
opj_thread_create(opj_thread_fn thread_fn,void * user_data)484 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
485 {
486     (void) thread_fn;
487     (void) user_data;
488     return NULL;
489 }
490 
opj_thread_join(opj_thread_t * thread)491 void opj_thread_join(opj_thread_t* thread)
492 {
493     (void) thread;
494 }
495 
496 #endif
497 
498 typedef struct {
499     int key;
500     void* value;
501     opj_tls_free_func opj_free_func;
502 } opj_tls_key_val_t;
503 
504 struct opj_tls_t {
505     opj_tls_key_val_t* key_val;
506     int                key_val_count;
507 };
508 
opj_tls_new(void)509 static opj_tls_t* opj_tls_new(void)
510 {
511     return (opj_tls_t*) opj_calloc(1, sizeof(opj_tls_t));
512 }
513 
opj_tls_destroy(opj_tls_t * tls)514 static void opj_tls_destroy(opj_tls_t* tls)
515 {
516     int i;
517     if (!tls) {
518         return;
519     }
520     for (i = 0; i < tls->key_val_count; i++) {
521         if (tls->key_val[i].opj_free_func) {
522             tls->key_val[i].opj_free_func(tls->key_val[i].value);
523         }
524     }
525     opj_free(tls->key_val);
526     opj_free(tls);
527 }
528 
opj_tls_get(opj_tls_t * tls,int key)529 void* opj_tls_get(opj_tls_t* tls, int key)
530 {
531     int i;
532     for (i = 0; i < tls->key_val_count; i++) {
533         if (tls->key_val[i].key == key) {
534             return tls->key_val[i].value;
535         }
536     }
537     return NULL;
538 }
539 
opj_tls_set(opj_tls_t * tls,int key,void * value,opj_tls_free_func opj_free_func)540 OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value,
541                      opj_tls_free_func opj_free_func)
542 {
543     opj_tls_key_val_t* new_key_val;
544     int i;
545 
546     if (tls->key_val_count == INT_MAX) {
547         return OPJ_FALSE;
548     }
549     for (i = 0; i < tls->key_val_count; i++) {
550         if (tls->key_val[i].key == key) {
551             if (tls->key_val[i].opj_free_func) {
552                 tls->key_val[i].opj_free_func(tls->key_val[i].value);
553             }
554             tls->key_val[i].value = value;
555             tls->key_val[i].opj_free_func = opj_free_func;
556             return OPJ_TRUE;
557         }
558     }
559     new_key_val = (opj_tls_key_val_t*) opj_realloc(tls->key_val,
560                   ((size_t)tls->key_val_count + 1U) * sizeof(opj_tls_key_val_t));
561     if (!new_key_val) {
562         return OPJ_FALSE;
563     }
564     tls->key_val = new_key_val;
565     new_key_val[tls->key_val_count].key = key;
566     new_key_val[tls->key_val_count].value = value;
567     new_key_val[tls->key_val_count].opj_free_func = opj_free_func;
568     tls->key_val_count ++;
569     return OPJ_TRUE;
570 }
571 
572 
573 typedef struct {
574     opj_job_fn          job_fn;
575     void               *user_data;
576 } opj_worker_thread_job_t;
577 
578 typedef struct {
579     opj_thread_pool_t   *tp;
580     opj_thread_t        *thread;
581     int                  marked_as_waiting;
582 
583     opj_mutex_t         *mutex;
584     opj_cond_t          *cond;
585 } opj_worker_thread_t;
586 
587 typedef enum {
588     OPJWTS_OK,
589     OPJWTS_STOP,
590     OPJWTS_ERROR
591 } opj_worker_thread_state;
592 
593 struct opj_job_list_t {
594     opj_worker_thread_job_t* job;
595     struct opj_job_list_t* next;
596 };
597 typedef struct opj_job_list_t opj_job_list_t;
598 
599 struct opj_worker_thread_list_t {
600     opj_worker_thread_t* worker_thread;
601     struct opj_worker_thread_list_t* next;
602 };
603 typedef struct opj_worker_thread_list_t opj_worker_thread_list_t;
604 
605 struct opj_thread_pool_t {
606     opj_worker_thread_t*             worker_threads;
607     int                              worker_threads_count;
608     opj_cond_t*                      cond;
609     opj_mutex_t*                     mutex;
610     volatile opj_worker_thread_state state;
611     opj_job_list_t*                  job_queue;
612     volatile int                     pending_jobs_count;
613     opj_worker_thread_list_t*        waiting_worker_thread_list;
614     int                              waiting_worker_thread_count;
615     opj_tls_t*                       tls;
616     int                              signaling_threshold;
617 };
618 
619 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads);
620 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(
621     opj_thread_pool_t* tp,
622     opj_worker_thread_t* worker_thread,
623     OPJ_BOOL signal_job_finished);
624 
opj_thread_pool_create(int num_threads)625 opj_thread_pool_t* opj_thread_pool_create(int num_threads)
626 {
627     opj_thread_pool_t* tp;
628 
629     tp = (opj_thread_pool_t*) opj_calloc(1, sizeof(opj_thread_pool_t));
630     if (!tp) {
631         return NULL;
632     }
633     tp->state = OPJWTS_OK;
634 
635     if (num_threads <= 0) {
636         tp->tls = opj_tls_new();
637         if (!tp->tls) {
638             opj_free(tp);
639             tp = NULL;
640         }
641         return tp;
642     }
643 
644     tp->mutex = opj_mutex_create();
645     if (!tp->mutex) {
646         opj_free(tp);
647         return NULL;
648     }
649     if (!opj_thread_pool_setup(tp, num_threads)) {
650         opj_thread_pool_destroy(tp);
651         return NULL;
652     }
653     return tp;
654 }
655 
opj_worker_thread_function(void * user_data)656 static void opj_worker_thread_function(void* user_data)
657 {
658     opj_worker_thread_t* worker_thread;
659     opj_thread_pool_t* tp;
660     opj_tls_t* tls;
661     OPJ_BOOL job_finished = OPJ_FALSE;
662 
663     worker_thread = (opj_worker_thread_t*) user_data;
664     tp = worker_thread->tp;
665     tls = opj_tls_new();
666 
667     while (OPJ_TRUE) {
668         opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread,
669                                        job_finished);
670         if (job == NULL) {
671             break;
672         }
673 
674         if (job->job_fn) {
675             job->job_fn(job->user_data, tls);
676         }
677         opj_free(job);
678         job_finished = OPJ_TRUE;
679     }
680 
681     opj_tls_destroy(tls);
682 }
683 
opj_thread_pool_setup(opj_thread_pool_t * tp,int num_threads)684 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads)
685 {
686     int i;
687     OPJ_BOOL bRet = OPJ_TRUE;
688 
689     assert(num_threads > 0);
690 
691     tp->cond = opj_cond_create();
692     if (tp->cond == NULL) {
693         return OPJ_FALSE;
694     }
695 
696     tp->worker_threads = (opj_worker_thread_t*) opj_calloc((size_t)num_threads,
697                          sizeof(opj_worker_thread_t));
698     if (tp->worker_threads == NULL) {
699         return OPJ_FALSE;
700     }
701     tp->worker_threads_count = num_threads;
702 
703     for (i = 0; i < num_threads; i++) {
704         tp->worker_threads[i].tp = tp;
705 
706         tp->worker_threads[i].mutex = opj_mutex_create();
707         if (tp->worker_threads[i].mutex == NULL) {
708             tp->worker_threads_count = i;
709             bRet = OPJ_FALSE;
710             break;
711         }
712 
713         tp->worker_threads[i].cond = opj_cond_create();
714         if (tp->worker_threads[i].cond == NULL) {
715             opj_mutex_destroy(tp->worker_threads[i].mutex);
716             tp->worker_threads_count = i;
717             bRet = OPJ_FALSE;
718             break;
719         }
720 
721         tp->worker_threads[i].marked_as_waiting = OPJ_FALSE;
722 
723         tp->worker_threads[i].thread = opj_thread_create(opj_worker_thread_function,
724                                        &(tp->worker_threads[i]));
725         if (tp->worker_threads[i].thread == NULL) {
726             tp->worker_threads_count = i;
727             bRet = OPJ_FALSE;
728             break;
729         }
730     }
731 
732     /* Wait all threads to be started */
733     /* printf("waiting for all threads to be started\n"); */
734     opj_mutex_lock(tp->mutex);
735     while (tp->waiting_worker_thread_count < num_threads) {
736         opj_cond_wait(tp->cond, tp->mutex);
737     }
738     opj_mutex_unlock(tp->mutex);
739     /* printf("all threads started\n"); */
740 
741     if (tp->state == OPJWTS_ERROR) {
742         bRet = OPJ_FALSE;
743     }
744 
745     return bRet;
746 }
747 
748 /*
749 void opj_waiting()
750 {
751     printf("waiting!\n");
752 }
753 */
754 
opj_thread_pool_get_next_job(opj_thread_pool_t * tp,opj_worker_thread_t * worker_thread,OPJ_BOOL signal_job_finished)755 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(
756     opj_thread_pool_t* tp,
757     opj_worker_thread_t* worker_thread,
758     OPJ_BOOL signal_job_finished)
759 {
760     while (OPJ_TRUE) {
761         opj_job_list_t* top_job_iter;
762 
763         opj_mutex_lock(tp->mutex);
764 
765         if (signal_job_finished) {
766             signal_job_finished = OPJ_FALSE;
767             tp->pending_jobs_count --;
768             /*printf("tp=%p, remaining jobs: %d\n", tp, tp->pending_jobs_count);*/
769             if (tp->pending_jobs_count <= tp->signaling_threshold) {
770                 opj_cond_signal(tp->cond);
771             }
772         }
773 
774         if (tp->state == OPJWTS_STOP) {
775             opj_mutex_unlock(tp->mutex);
776             return NULL;
777         }
778         top_job_iter = tp->job_queue;
779         if (top_job_iter) {
780             opj_worker_thread_job_t* job;
781             tp->job_queue = top_job_iter->next;
782 
783             job = top_job_iter->job;
784             opj_mutex_unlock(tp->mutex);
785             opj_free(top_job_iter);
786             return job;
787         }
788 
789         /* opj_waiting(); */
790         if (!worker_thread->marked_as_waiting) {
791             opj_worker_thread_list_t* item;
792 
793             worker_thread->marked_as_waiting = OPJ_TRUE;
794             tp->waiting_worker_thread_count ++;
795             assert(tp->waiting_worker_thread_count <= tp->worker_threads_count);
796 
797             item = (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t));
798             if (item == NULL) {
799                 tp->state = OPJWTS_ERROR;
800                 opj_cond_signal(tp->cond);
801 
802                 opj_mutex_unlock(tp->mutex);
803                 return NULL;
804             }
805 
806             item->worker_thread = worker_thread;
807             item->next = tp->waiting_worker_thread_list;
808             tp->waiting_worker_thread_list = item;
809         }
810 
811         /* printf("signaling that worker thread is ready\n"); */
812         opj_cond_signal(tp->cond);
813 
814         opj_mutex_lock(worker_thread->mutex);
815         opj_mutex_unlock(tp->mutex);
816 
817         /* printf("waiting for job\n"); */
818         opj_cond_wait(worker_thread->cond, worker_thread->mutex);
819 
820         opj_mutex_unlock(worker_thread->mutex);
821         /* printf("got job\n"); */
822     }
823 }
824 
opj_thread_pool_submit_job(opj_thread_pool_t * tp,opj_job_fn job_fn,void * user_data)825 OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp,
826                                     opj_job_fn job_fn,
827                                     void* user_data)
828 {
829     opj_worker_thread_job_t* job;
830     opj_job_list_t* item;
831 
832     if (tp->mutex == NULL) {
833         job_fn(user_data, tp->tls);
834         return OPJ_TRUE;
835     }
836 
837     job = (opj_worker_thread_job_t*)opj_malloc(sizeof(opj_worker_thread_job_t));
838     if (job == NULL) {
839         return OPJ_FALSE;
840     }
841     job->job_fn = job_fn;
842     job->user_data = user_data;
843 
844     item = (opj_job_list_t*) opj_malloc(sizeof(opj_job_list_t));
845     if (item == NULL) {
846         opj_free(job);
847         return OPJ_FALSE;
848     }
849     item->job = job;
850 
851     opj_mutex_lock(tp->mutex);
852 
853     tp->signaling_threshold = 100 * tp->worker_threads_count;
854     while (tp->pending_jobs_count > tp->signaling_threshold) {
855         /* printf("%d jobs enqueued. Waiting\n", tp->pending_jobs_count); */
856         opj_cond_wait(tp->cond, tp->mutex);
857         /* printf("...%d jobs enqueued.\n", tp->pending_jobs_count); */
858     }
859 
860     item->next = tp->job_queue;
861     tp->job_queue = item;
862     tp->pending_jobs_count ++;
863 
864     if (tp->waiting_worker_thread_list) {
865         opj_worker_thread_t* worker_thread;
866         opj_worker_thread_list_t* next;
867         opj_worker_thread_list_t* to_opj_free;
868 
869         worker_thread = tp->waiting_worker_thread_list->worker_thread;
870 
871         assert(worker_thread->marked_as_waiting);
872         worker_thread->marked_as_waiting = OPJ_FALSE;
873 
874         next = tp->waiting_worker_thread_list->next;
875         to_opj_free = tp->waiting_worker_thread_list;
876         tp->waiting_worker_thread_list = next;
877         tp->waiting_worker_thread_count --;
878 
879         opj_mutex_lock(worker_thread->mutex);
880         opj_mutex_unlock(tp->mutex);
881         opj_cond_signal(worker_thread->cond);
882         opj_mutex_unlock(worker_thread->mutex);
883 
884         opj_free(to_opj_free);
885     } else {
886         opj_mutex_unlock(tp->mutex);
887     }
888 
889     return OPJ_TRUE;
890 }
891 
opj_thread_pool_wait_completion(opj_thread_pool_t * tp,int max_remaining_jobs)892 void opj_thread_pool_wait_completion(opj_thread_pool_t* tp,
893                                      int max_remaining_jobs)
894 {
895     if (tp->mutex == NULL) {
896         return;
897     }
898 
899     if (max_remaining_jobs < 0) {
900         max_remaining_jobs = 0;
901     }
902     opj_mutex_lock(tp->mutex);
903     tp->signaling_threshold = max_remaining_jobs;
904     while (tp->pending_jobs_count > max_remaining_jobs) {
905         /*printf("tp=%p, jobs before wait = %d, max_remaining_jobs = %d\n", tp, tp->pending_jobs_count, max_remaining_jobs);*/
906         opj_cond_wait(tp->cond, tp->mutex);
907         /*printf("tp=%p, jobs after wait = %d\n", tp, tp->pending_jobs_count);*/
908     }
909     opj_mutex_unlock(tp->mutex);
910 }
911 
opj_thread_pool_get_thread_count(opj_thread_pool_t * tp)912 int opj_thread_pool_get_thread_count(opj_thread_pool_t* tp)
913 {
914     return tp->worker_threads_count;
915 }
916 
opj_thread_pool_destroy(opj_thread_pool_t * tp)917 void opj_thread_pool_destroy(opj_thread_pool_t* tp)
918 {
919     if (!tp) {
920         return;
921     }
922     if (tp->cond) {
923         int i;
924         opj_thread_pool_wait_completion(tp, 0);
925 
926         opj_mutex_lock(tp->mutex);
927         tp->state = OPJWTS_STOP;
928         opj_mutex_unlock(tp->mutex);
929 
930         for (i = 0; i < tp->worker_threads_count; i++) {
931             opj_mutex_lock(tp->worker_threads[i].mutex);
932             opj_cond_signal(tp->worker_threads[i].cond);
933             opj_mutex_unlock(tp->worker_threads[i].mutex);
934             opj_thread_join(tp->worker_threads[i].thread);
935             opj_cond_destroy(tp->worker_threads[i].cond);
936             opj_mutex_destroy(tp->worker_threads[i].mutex);
937         }
938 
939         opj_free(tp->worker_threads);
940 
941         while (tp->waiting_worker_thread_list != NULL) {
942             opj_worker_thread_list_t* next = tp->waiting_worker_thread_list->next;
943             opj_free(tp->waiting_worker_thread_list);
944             tp->waiting_worker_thread_list = next;
945         }
946 
947         opj_cond_destroy(tp->cond);
948     }
949     opj_mutex_destroy(tp->mutex);
950     opj_tls_destroy(tp->tls);
951     opj_free(tp);
952 }
953