1 /*
2 * Copyright © 2016 Advanced Micro Devices, Inc.
3 * All Rights Reserved.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sub license, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16 * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17 * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 * USE OR OTHER DEALINGS IN THE SOFTWARE.
21 *
22 * The above copyright notice and this permission notice (including the
23 * next paragraph) shall be included in all copies or substantial portions
24 * of the Software.
25 */
26
27 #include "u_queue.h"
28
29 #include "c11/threads.h"
30 #include "util/u_cpu_detect.h"
31 #include "util/os_time.h"
32 #include "util/u_string.h"
33 #include "util/u_thread.h"
34 #include "u_process.h"
35
36 #if defined(__linux__)
37 #include <sys/time.h>
38 #include <sys/resource.h>
39 #include <sys/syscall.h>
40 #endif
41
42
43 /* Define 256MB */
44 #define S_256MB (256 * 1024 * 1024)
45
46 static void
47 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
48 bool locked);
49
50 /****************************************************************************
51 * Wait for all queues to assert idle when exit() is called.
52 *
53 * Otherwise, C++ static variable destructors can be called while threads
54 * are using the static variables.
55 */
56
57 static once_flag atexit_once_flag = ONCE_FLAG_INIT;
58 static struct list_head queue_list = {
59 .next = &queue_list,
60 .prev = &queue_list,
61 };
62 static mtx_t exit_mutex;
63
64 static void
atexit_handler(void)65 atexit_handler(void)
66 {
67 struct util_queue *iter;
68
69 mtx_lock(&exit_mutex);
70 /* Wait for all queues to assert idle. */
71 LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
72 util_queue_kill_threads(iter, 0, false);
73 }
74 mtx_unlock(&exit_mutex);
75 }
76
77 static void
global_init(void)78 global_init(void)
79 {
80 mtx_init(&exit_mutex, mtx_plain);
81 atexit(atexit_handler);
82 }
83
84 static void
add_to_atexit_list(struct util_queue * queue)85 add_to_atexit_list(struct util_queue *queue)
86 {
87 call_once(&atexit_once_flag, global_init);
88
89 mtx_lock(&exit_mutex);
90 list_add(&queue->head, &queue_list);
91 mtx_unlock(&exit_mutex);
92 }
93
94 static void
remove_from_atexit_list(struct util_queue * queue)95 remove_from_atexit_list(struct util_queue *queue)
96 {
97 struct util_queue *iter, *tmp;
98
99 mtx_lock(&exit_mutex);
100 LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
101 if (iter == queue) {
102 list_del(&iter->head);
103 break;
104 }
105 }
106 mtx_unlock(&exit_mutex);
107 }
108
109 /****************************************************************************
110 * util_queue_fence
111 */
112
113 #ifdef UTIL_QUEUE_FENCE_FUTEX
114 static bool
do_futex_fence_wait(struct util_queue_fence * fence,bool timeout,int64_t abs_timeout)115 do_futex_fence_wait(struct util_queue_fence *fence,
116 bool timeout, int64_t abs_timeout)
117 {
118 uint32_t v = p_atomic_read_relaxed(&fence->val);
119 struct timespec ts;
120 ts.tv_sec = abs_timeout / (1000*1000*1000);
121 ts.tv_nsec = abs_timeout % (1000*1000*1000);
122
123 while (v != 0) {
124 if (v != 2) {
125 v = p_atomic_cmpxchg(&fence->val, 1, 2);
126 if (v == 0)
127 return true;
128 }
129
130 int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
131 if (timeout && r < 0) {
132 if (errno == ETIMEDOUT)
133 return false;
134 }
135
136 v = p_atomic_read_relaxed(&fence->val);
137 }
138
139 return true;
140 }
141
142 void
_util_queue_fence_wait(struct util_queue_fence * fence)143 _util_queue_fence_wait(struct util_queue_fence *fence)
144 {
145 do_futex_fence_wait(fence, false, 0);
146 }
147
148 bool
_util_queue_fence_wait_timeout(struct util_queue_fence * fence,int64_t abs_timeout)149 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
150 int64_t abs_timeout)
151 {
152 return do_futex_fence_wait(fence, true, abs_timeout);
153 }
154
155 #endif
156
157 #ifdef UTIL_QUEUE_FENCE_STANDARD
158 void
util_queue_fence_signal(struct util_queue_fence * fence)159 util_queue_fence_signal(struct util_queue_fence *fence)
160 {
161 mtx_lock(&fence->mutex);
162 fence->signalled = true;
163 cnd_broadcast(&fence->cond);
164 mtx_unlock(&fence->mutex);
165 }
166
167 void
_util_queue_fence_wait(struct util_queue_fence * fence)168 _util_queue_fence_wait(struct util_queue_fence *fence)
169 {
170 mtx_lock(&fence->mutex);
171 while (!fence->signalled)
172 cnd_wait(&fence->cond, &fence->mutex);
173 mtx_unlock(&fence->mutex);
174 }
175
176 bool
_util_queue_fence_wait_timeout(struct util_queue_fence * fence,int64_t abs_timeout)177 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
178 int64_t abs_timeout)
179 {
180 /* This terrible hack is made necessary by the fact that we really want an
181 * internal interface consistent with os_time_*, but cnd_timedwait is spec'd
182 * to be relative to the TIME_UTC clock.
183 */
184 int64_t rel = abs_timeout - os_time_get_nano();
185
186 if (rel > 0) {
187 struct timespec ts;
188
189 timespec_get(&ts, TIME_UTC);
190
191 ts.tv_sec += abs_timeout / (1000*1000*1000);
192 ts.tv_nsec += abs_timeout % (1000*1000*1000);
193 if (ts.tv_nsec >= (1000*1000*1000)) {
194 ts.tv_sec++;
195 ts.tv_nsec -= (1000*1000*1000);
196 }
197
198 mtx_lock(&fence->mutex);
199 while (!fence->signalled) {
200 if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
201 break;
202 }
203 mtx_unlock(&fence->mutex);
204 }
205
206 return fence->signalled;
207 }
208
209 void
util_queue_fence_init(struct util_queue_fence * fence)210 util_queue_fence_init(struct util_queue_fence *fence)
211 {
212 memset(fence, 0, sizeof(*fence));
213 (void) mtx_init(&fence->mutex, mtx_plain);
214 cnd_init(&fence->cond);
215 fence->signalled = true;
216 }
217
218 void
util_queue_fence_destroy(struct util_queue_fence * fence)219 util_queue_fence_destroy(struct util_queue_fence *fence)
220 {
221 assert(fence->signalled);
222
223 /* Ensure that another thread is not in the middle of
224 * util_queue_fence_signal (having set the fence to signalled but still
225 * holding the fence mutex).
226 *
227 * A common contract between threads is that as soon as a fence is signalled
228 * by thread A, thread B is allowed to destroy it. Since
229 * util_queue_fence_is_signalled does not lock the fence mutex (for
230 * performance reasons), we must do so here.
231 */
232 mtx_lock(&fence->mutex);
233 mtx_unlock(&fence->mutex);
234
235 cnd_destroy(&fence->cond);
236 mtx_destroy(&fence->mutex);
237 }
238 #endif
239
240 /****************************************************************************
241 * util_queue implementation
242 */
243
244 struct thread_input {
245 struct util_queue *queue;
246 int thread_index;
247 };
248
249 static int
util_queue_thread_func(void * input)250 util_queue_thread_func(void *input)
251 {
252 struct util_queue *queue = ((struct thread_input*)input)->queue;
253 int thread_index = ((struct thread_input*)input)->thread_index;
254
255 free(input);
256
257 if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
258 /* Don't inherit the thread affinity from the parent thread.
259 * Set the full mask.
260 */
261 uint32_t mask[UTIL_MAX_CPUS / 32];
262
263 memset(mask, 0xff, sizeof(mask));
264
265 util_set_current_thread_affinity(mask, NULL,
266 util_get_cpu_caps()->num_cpu_mask_bits);
267 }
268
269 #if defined(__linux__)
270 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
271 /* The nice() function can only set a maximum of 19. */
272 setpriority(PRIO_PROCESS, syscall(SYS_gettid), 19);
273 }
274 #endif
275
276 if (strlen(queue->name) > 0) {
277 char name[16];
278 snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
279 u_thread_setname(name);
280 }
281
282 while (1) {
283 struct util_queue_job job;
284
285 mtx_lock(&queue->lock);
286 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
287
288 /* wait if the queue is empty */
289 while (thread_index < queue->num_threads && queue->num_queued == 0)
290 cnd_wait(&queue->has_queued_cond, &queue->lock);
291
292 /* only kill threads that are above "num_threads" */
293 if (thread_index >= queue->num_threads) {
294 mtx_unlock(&queue->lock);
295 break;
296 }
297
298 job = queue->jobs[queue->read_idx];
299 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
300 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
301
302 queue->num_queued--;
303 cnd_signal(&queue->has_space_cond);
304 if (job.job)
305 queue->total_jobs_size -= job.job_size;
306 mtx_unlock(&queue->lock);
307
308 if (job.job) {
309 job.execute(job.job, job.global_data, thread_index);
310 if (job.fence)
311 util_queue_fence_signal(job.fence);
312 if (job.cleanup)
313 job.cleanup(job.job, job.global_data, thread_index);
314 }
315 }
316
317 /* signal remaining jobs if all threads are being terminated */
318 mtx_lock(&queue->lock);
319 if (queue->num_threads == 0) {
320 for (unsigned i = queue->read_idx; i != queue->write_idx;
321 i = (i + 1) % queue->max_jobs) {
322 if (queue->jobs[i].job) {
323 if (queue->jobs[i].fence)
324 util_queue_fence_signal(queue->jobs[i].fence);
325 queue->jobs[i].job = NULL;
326 }
327 }
328 queue->read_idx = queue->write_idx;
329 queue->num_queued = 0;
330 }
331 mtx_unlock(&queue->lock);
332 return 0;
333 }
334
335 static bool
util_queue_create_thread(struct util_queue * queue,unsigned index)336 util_queue_create_thread(struct util_queue *queue, unsigned index)
337 {
338 struct thread_input *input =
339 (struct thread_input *) malloc(sizeof(struct thread_input));
340 input->queue = queue;
341 input->thread_index = index;
342
343 if (thrd_success != u_thread_create(queue->threads + index, util_queue_thread_func, input)) {
344 free(input);
345 return false;
346 }
347
348 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
349 #if defined(__linux__) && defined(SCHED_BATCH)
350 struct sched_param sched_param = {0};
351
352 /* The nice() function can only set a maximum of 19.
353 * SCHED_BATCH gives the scheduler a hint that this is a latency
354 * insensitive thread.
355 *
356 * Note that Linux only allows decreasing the priority. The original
357 * priority can't be restored.
358 */
359 pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param);
360 #endif
361 }
362 return true;
363 }
364
365 void
util_queue_adjust_num_threads(struct util_queue * queue,unsigned num_threads,bool locked)366 util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads,
367 bool locked)
368 {
369 num_threads = MIN2(num_threads, queue->max_threads);
370 num_threads = MAX2(num_threads, 1);
371
372 if (!locked)
373 mtx_lock(&queue->lock);
374
375 unsigned old_num_threads = queue->num_threads;
376
377 if (num_threads == old_num_threads) {
378 if (!locked)
379 mtx_unlock(&queue->lock);
380 return;
381 }
382
383 if (num_threads < old_num_threads) {
384 util_queue_kill_threads(queue, num_threads, true);
385 if (!locked)
386 mtx_unlock(&queue->lock);
387 return;
388 }
389
390 /* Create threads.
391 *
392 * We need to update num_threads first, because threads terminate
393 * when thread_index < num_threads.
394 */
395 queue->num_threads = num_threads;
396 for (unsigned i = old_num_threads; i < num_threads; i++) {
397 if (!util_queue_create_thread(queue, i)) {
398 queue->num_threads = i;
399 break;
400 }
401 }
402
403 if (!locked)
404 mtx_unlock(&queue->lock);
405 }
406
407 bool
util_queue_init(struct util_queue * queue,const char * name,unsigned max_jobs,unsigned num_threads,unsigned flags,void * global_data)408 util_queue_init(struct util_queue *queue,
409 const char *name,
410 unsigned max_jobs,
411 unsigned num_threads,
412 unsigned flags,
413 void *global_data)
414 {
415 unsigned i;
416
417 /* Form the thread name from process_name and name, limited to 13
418 * characters. Characters 14-15 are reserved for the thread number.
419 * Character 16 should be 0. Final form: "process:name12"
420 *
421 * If name is too long, it's truncated. If any space is left, the process
422 * name fills it.
423 */
424 const char *process_name = util_get_process_name();
425 int process_len = process_name ? strlen(process_name) : 0;
426 int name_len = strlen(name);
427 const int max_chars = sizeof(queue->name) - 1;
428
429 name_len = MIN2(name_len, max_chars);
430
431 /* See if there is any space left for the process name, reserve 1 for
432 * the colon. */
433 process_len = MIN2(process_len, max_chars - name_len - 1);
434 process_len = MAX2(process_len, 0);
435
436 memset(queue, 0, sizeof(*queue));
437
438 if (process_len) {
439 snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
440 process_len, process_name, name);
441 } else {
442 snprintf(queue->name, sizeof(queue->name), "%s", name);
443 }
444
445 queue->create_threads_on_demand = true;
446 queue->flags = flags;
447 queue->max_threads = num_threads;
448 queue->num_threads = 1;
449 queue->max_jobs = max_jobs;
450 queue->global_data = global_data;
451
452 (void) mtx_init(&queue->lock, mtx_plain);
453
454 queue->num_queued = 0;
455 cnd_init(&queue->has_queued_cond);
456 cnd_init(&queue->has_space_cond);
457
458 queue->jobs = (struct util_queue_job*)
459 calloc(max_jobs, sizeof(struct util_queue_job));
460 if (!queue->jobs)
461 goto fail;
462
463 queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t));
464 if (!queue->threads)
465 goto fail;
466
467 /* start threads */
468 for (i = 0; i < queue->num_threads; i++) {
469 if (!util_queue_create_thread(queue, i)) {
470 if (i == 0) {
471 /* no threads created, fail */
472 goto fail;
473 } else {
474 /* at least one thread created, so use it */
475 queue->num_threads = i;
476 break;
477 }
478 }
479 }
480
481 add_to_atexit_list(queue);
482 return true;
483
484 fail:
485 free(queue->threads);
486
487 if (queue->jobs) {
488 cnd_destroy(&queue->has_space_cond);
489 cnd_destroy(&queue->has_queued_cond);
490 mtx_destroy(&queue->lock);
491 free(queue->jobs);
492 }
493 /* also util_queue_is_initialized can be used to check for success */
494 memset(queue, 0, sizeof(*queue));
495 return false;
496 }
497
498 static void
util_queue_kill_threads(struct util_queue * queue,unsigned keep_num_threads,bool locked)499 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
500 bool locked)
501 {
502 /* Signal all threads to terminate. */
503 if (!locked)
504 mtx_lock(&queue->lock);
505
506 if (keep_num_threads >= queue->num_threads) {
507 if (!locked)
508 mtx_unlock(&queue->lock);
509 return;
510 }
511
512 unsigned old_num_threads = queue->num_threads;
513 /* Setting num_threads is what causes the threads to terminate.
514 * Then cnd_broadcast wakes them up and they will exit their function.
515 */
516 queue->num_threads = keep_num_threads;
517 cnd_broadcast(&queue->has_queued_cond);
518
519 /* Wait for threads to terminate. */
520 if (keep_num_threads < old_num_threads) {
521 /* We need to unlock the mutex to allow threads to terminate. */
522 mtx_unlock(&queue->lock);
523 for (unsigned i = keep_num_threads; i < old_num_threads; i++)
524 thrd_join(queue->threads[i], NULL);
525 if (locked)
526 mtx_lock(&queue->lock);
527 } else {
528 if (!locked)
529 mtx_unlock(&queue->lock);
530 }
531 }
532
533 static void
util_queue_finish_execute(void * data,void * gdata,int num_thread)534 util_queue_finish_execute(void *data, void *gdata, int num_thread)
535 {
536 util_barrier *barrier = data;
537 if (util_barrier_wait(barrier))
538 util_barrier_destroy(barrier);
539 }
540
541 void
util_queue_destroy(struct util_queue * queue)542 util_queue_destroy(struct util_queue *queue)
543 {
544 util_queue_kill_threads(queue, 0, false);
545
546 /* This makes it safe to call on a queue that failed util_queue_init. */
547 if (queue->head.next != NULL)
548 remove_from_atexit_list(queue);
549
550 cnd_destroy(&queue->has_space_cond);
551 cnd_destroy(&queue->has_queued_cond);
552 mtx_destroy(&queue->lock);
553 free(queue->jobs);
554 free(queue->threads);
555 }
556
557 static void
util_queue_add_job_locked(struct util_queue * queue,void * job,struct util_queue_fence * fence,util_queue_execute_func execute,util_queue_execute_func cleanup,const size_t job_size,bool locked)558 util_queue_add_job_locked(struct util_queue *queue,
559 void *job,
560 struct util_queue_fence *fence,
561 util_queue_execute_func execute,
562 util_queue_execute_func cleanup,
563 const size_t job_size,
564 bool locked)
565 {
566 struct util_queue_job *ptr;
567
568 if (!locked)
569 mtx_lock(&queue->lock);
570 if (queue->num_threads == 0) {
571 if (!locked)
572 mtx_unlock(&queue->lock);
573 /* well no good option here, but any leaks will be
574 * short-lived as things are shutting down..
575 */
576 return;
577 }
578
579 if (fence)
580 util_queue_fence_reset(fence);
581
582 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
583
584 /* Scale the number of threads up if there's already one job waiting. */
585 if (queue->num_queued > 0 &&
586 queue->create_threads_on_demand &&
587 execute != util_queue_finish_execute &&
588 queue->num_threads < queue->max_threads) {
589 util_queue_adjust_num_threads(queue, queue->num_threads + 1, true);
590 }
591
592 if (queue->num_queued == queue->max_jobs) {
593 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL &&
594 queue->total_jobs_size + job_size < S_256MB) {
595 /* If the queue is full, make it larger to avoid waiting for a free
596 * slot.
597 */
598 unsigned new_max_jobs = queue->max_jobs + 8;
599 struct util_queue_job *jobs =
600 (struct util_queue_job*)calloc(new_max_jobs,
601 sizeof(struct util_queue_job));
602 assert(jobs);
603
604 /* Copy all queued jobs into the new list. */
605 unsigned num_jobs = 0;
606 unsigned i = queue->read_idx;
607
608 do {
609 jobs[num_jobs++] = queue->jobs[i];
610 i = (i + 1) % queue->max_jobs;
611 } while (i != queue->write_idx);
612
613 assert(num_jobs == queue->num_queued);
614
615 free(queue->jobs);
616 queue->jobs = jobs;
617 queue->read_idx = 0;
618 queue->write_idx = num_jobs;
619 queue->max_jobs = new_max_jobs;
620 } else {
621 /* Wait until there is a free slot. */
622 while (queue->num_queued == queue->max_jobs)
623 cnd_wait(&queue->has_space_cond, &queue->lock);
624 }
625 }
626
627 ptr = &queue->jobs[queue->write_idx];
628 assert(ptr->job == NULL);
629 ptr->job = job;
630 ptr->global_data = queue->global_data;
631 ptr->fence = fence;
632 ptr->execute = execute;
633 ptr->cleanup = cleanup;
634 ptr->job_size = job_size;
635
636 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
637 queue->total_jobs_size += ptr->job_size;
638
639 queue->num_queued++;
640 cnd_signal(&queue->has_queued_cond);
641 if (!locked)
642 mtx_unlock(&queue->lock);
643 }
644
645 void
util_queue_add_job(struct util_queue * queue,void * job,struct util_queue_fence * fence,util_queue_execute_func execute,util_queue_execute_func cleanup,const size_t job_size)646 util_queue_add_job(struct util_queue *queue,
647 void *job,
648 struct util_queue_fence *fence,
649 util_queue_execute_func execute,
650 util_queue_execute_func cleanup,
651 const size_t job_size)
652 {
653 util_queue_add_job_locked(queue, job, fence, execute, cleanup, job_size,
654 false);
655 }
656
657 /**
658 * Remove a queued job. If the job hasn't started execution, it's removed from
659 * the queue. If the job has started execution, the function waits for it to
660 * complete.
661 *
662 * In all cases, the fence is signalled when the function returns.
663 *
664 * The function can be used when destroying an object associated with the job
665 * when you don't care about the job completion state.
666 */
667 void
util_queue_drop_job(struct util_queue * queue,struct util_queue_fence * fence)668 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
669 {
670 bool removed = false;
671
672 if (util_queue_fence_is_signalled(fence))
673 return;
674
675 mtx_lock(&queue->lock);
676 for (unsigned i = queue->read_idx; i != queue->write_idx;
677 i = (i + 1) % queue->max_jobs) {
678 if (queue->jobs[i].fence == fence) {
679 if (queue->jobs[i].cleanup)
680 queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1);
681
682 /* Just clear it. The threads will treat as a no-op job. */
683 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
684 removed = true;
685 break;
686 }
687 }
688 mtx_unlock(&queue->lock);
689
690 if (removed)
691 util_queue_fence_signal(fence);
692 else
693 util_queue_fence_wait(fence);
694 }
695
696 /**
697 * Wait until all previously added jobs have completed.
698 */
699 void
util_queue_finish(struct util_queue * queue)700 util_queue_finish(struct util_queue *queue)
701 {
702 util_barrier barrier;
703 struct util_queue_fence *fences;
704
705 /* If 2 threads were adding jobs for 2 different barries at the same time,
706 * a deadlock would happen, because 1 barrier requires that all threads
707 * wait for it exclusively.
708 */
709 mtx_lock(&queue->lock);
710
711 /* The number of threads can be changed to 0, e.g. by the atexit handler. */
712 if (!queue->num_threads) {
713 mtx_unlock(&queue->lock);
714 return;
715 }
716
717 /* We need to disable adding new threads in util_queue_add_job because
718 * the finish operation requires a fixed number of threads.
719 *
720 * Also note that util_queue_add_job can unlock the mutex if there is not
721 * enough space in the queue and wait for space.
722 */
723 queue->create_threads_on_demand = false;
724
725 fences = malloc(queue->num_threads * sizeof(*fences));
726 util_barrier_init(&barrier, queue->num_threads);
727
728 for (unsigned i = 0; i < queue->num_threads; ++i) {
729 util_queue_fence_init(&fences[i]);
730 util_queue_add_job_locked(queue, &barrier, &fences[i],
731 util_queue_finish_execute, NULL, 0, true);
732 }
733 queue->create_threads_on_demand = true;
734 mtx_unlock(&queue->lock);
735
736 for (unsigned i = 0; i < queue->num_threads; ++i) {
737 util_queue_fence_wait(&fences[i]);
738 util_queue_fence_destroy(&fences[i]);
739 }
740
741 free(fences);
742 }
743
744 int64_t
util_queue_get_thread_time_nano(struct util_queue * queue,unsigned thread_index)745 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
746 {
747 /* Allow some flexibility by not raising an error. */
748 if (thread_index >= queue->num_threads)
749 return 0;
750
751 return util_thread_get_time_nano(queue->threads[thread_index]);
752 }
753