Lines Matching full:queue
35 static void util_queue_killall_and_wait(struct util_queue *queue);
69 add_to_atexit_list(struct util_queue *queue) in add_to_atexit_list() argument
74 LIST_ADD(&queue->head, &queue_list); in add_to_atexit_list()
79 remove_from_atexit_list(struct util_queue *queue) in remove_from_atexit_list() argument
85 if (iter == queue) { in remove_from_atexit_list()
229 struct util_queue *queue; member
236 struct util_queue *queue = ((struct thread_input*)input)->queue; in util_queue_thread_func() local
241 if (queue->name) { in util_queue_thread_func()
243 util_snprintf(name, sizeof(name), "%s:%i", queue->name, thread_index); in util_queue_thread_func()
250 mtx_lock(&queue->lock); in util_queue_thread_func()
251 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); in util_queue_thread_func()
253 /* wait if the queue is empty */ in util_queue_thread_func()
254 while (!queue->kill_threads && queue->num_queued == 0) in util_queue_thread_func()
255 cnd_wait(&queue->has_queued_cond, &queue->lock); in util_queue_thread_func()
257 if (queue->kill_threads) { in util_queue_thread_func()
258 mtx_unlock(&queue->lock); in util_queue_thread_func()
262 job = queue->jobs[queue->read_idx]; in util_queue_thread_func()
263 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); in util_queue_thread_func()
264 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; in util_queue_thread_func()
266 queue->num_queued--; in util_queue_thread_func()
267 cnd_signal(&queue->has_space_cond); in util_queue_thread_func()
268 mtx_unlock(&queue->lock); in util_queue_thread_func()
279 mtx_lock(&queue->lock); in util_queue_thread_func()
280 for (unsigned i = queue->read_idx; i != queue->write_idx; in util_queue_thread_func()
281 i = (i + 1) % queue->max_jobs) { in util_queue_thread_func()
282 if (queue->jobs[i].job) { in util_queue_thread_func()
283 util_queue_fence_signal(queue->jobs[i].fence); in util_queue_thread_func()
284 queue->jobs[i].job = NULL; in util_queue_thread_func()
287 queue->read_idx = queue->write_idx; in util_queue_thread_func()
288 queue->num_queued = 0; in util_queue_thread_func()
289 mtx_unlock(&queue->lock); in util_queue_thread_func()
294 util_queue_init(struct util_queue *queue, in util_queue_init() argument
302 memset(queue, 0, sizeof(*queue)); in util_queue_init()
303 queue->name = name; in util_queue_init()
304 queue->flags = flags; in util_queue_init()
305 queue->num_threads = num_threads; in util_queue_init()
306 queue->max_jobs = max_jobs; in util_queue_init()
308 queue->jobs = (struct util_queue_job*) in util_queue_init()
310 if (!queue->jobs) in util_queue_init()
313 (void) mtx_init(&queue->lock, mtx_plain); in util_queue_init()
314 (void) mtx_init(&queue->finish_lock, mtx_plain); in util_queue_init()
316 queue->num_queued = 0; in util_queue_init()
317 cnd_init(&queue->has_queued_cond); in util_queue_init()
318 cnd_init(&queue->has_space_cond); in util_queue_init()
320 queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t)); in util_queue_init()
321 if (!queue->threads) in util_queue_init()
328 input->queue = queue; in util_queue_init()
331 queue->threads[i] = u_thread_create(util_queue_thread_func, input); in util_queue_init()
333 if (!queue->threads[i]) { in util_queue_init()
341 queue->num_threads = i; in util_queue_init()
356 pthread_setschedparam(queue->threads[i], SCHED_IDLE, &sched_param); in util_queue_init()
361 add_to_atexit_list(queue); in util_queue_init()
365 free(queue->threads); in util_queue_init()
367 if (queue->jobs) { in util_queue_init()
368 cnd_destroy(&queue->has_space_cond); in util_queue_init()
369 cnd_destroy(&queue->has_queued_cond); in util_queue_init()
370 mtx_destroy(&queue->lock); in util_queue_init()
371 free(queue->jobs); in util_queue_init()
374 memset(queue, 0, sizeof(*queue)); in util_queue_init()
379 util_queue_killall_and_wait(struct util_queue *queue) in util_queue_killall_and_wait() argument
384 mtx_lock(&queue->lock); in util_queue_killall_and_wait()
385 queue->kill_threads = 1; in util_queue_killall_and_wait()
386 cnd_broadcast(&queue->has_queued_cond); in util_queue_killall_and_wait()
387 mtx_unlock(&queue->lock); in util_queue_killall_and_wait()
389 for (i = 0; i < queue->num_threads; i++) in util_queue_killall_and_wait()
390 thrd_join(queue->threads[i], NULL); in util_queue_killall_and_wait()
391 queue->num_threads = 0; in util_queue_killall_and_wait()
395 util_queue_destroy(struct util_queue *queue) in util_queue_destroy() argument
397 util_queue_killall_and_wait(queue); in util_queue_destroy()
398 remove_from_atexit_list(queue); in util_queue_destroy()
400 cnd_destroy(&queue->has_space_cond); in util_queue_destroy()
401 cnd_destroy(&queue->has_queued_cond); in util_queue_destroy()
402 mtx_destroy(&queue->finish_lock); in util_queue_destroy()
403 mtx_destroy(&queue->lock); in util_queue_destroy()
404 free(queue->jobs); in util_queue_destroy()
405 free(queue->threads); in util_queue_destroy()
409 util_queue_add_job(struct util_queue *queue, in util_queue_add_job() argument
417 mtx_lock(&queue->lock); in util_queue_add_job()
418 if (queue->kill_threads) { in util_queue_add_job()
419 mtx_unlock(&queue->lock); in util_queue_add_job()
428 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); in util_queue_add_job()
430 if (queue->num_queued == queue->max_jobs) { in util_queue_add_job()
431 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL) { in util_queue_add_job()
432 /* If the queue is full, make it larger to avoid waiting for a free in util_queue_add_job()
435 unsigned new_max_jobs = queue->max_jobs + 8; in util_queue_add_job()
443 unsigned i = queue->read_idx; in util_queue_add_job()
446 jobs[num_jobs++] = queue->jobs[i]; in util_queue_add_job()
447 i = (i + 1) % queue->max_jobs; in util_queue_add_job()
448 } while (i != queue->write_idx); in util_queue_add_job()
450 assert(num_jobs == queue->num_queued); in util_queue_add_job()
452 free(queue->jobs); in util_queue_add_job()
453 queue->jobs = jobs; in util_queue_add_job()
454 queue->read_idx = 0; in util_queue_add_job()
455 queue->write_idx = num_jobs; in util_queue_add_job()
456 queue->max_jobs = new_max_jobs; in util_queue_add_job()
459 while (queue->num_queued == queue->max_jobs) in util_queue_add_job()
460 cnd_wait(&queue->has_space_cond, &queue->lock); in util_queue_add_job()
464 ptr = &queue->jobs[queue->write_idx]; in util_queue_add_job()
470 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; in util_queue_add_job()
472 queue->num_queued++; in util_queue_add_job()
473 cnd_signal(&queue->has_queued_cond); in util_queue_add_job()
474 mtx_unlock(&queue->lock); in util_queue_add_job()
479 * the queue. If the job has started execution, the function waits for it to
488 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence) in util_queue_drop_job() argument
495 mtx_lock(&queue->lock); in util_queue_drop_job()
496 for (unsigned i = queue->read_idx; i != queue->write_idx; in util_queue_drop_job()
497 i = (i + 1) % queue->max_jobs) { in util_queue_drop_job()
498 if (queue->jobs[i].fence == fence) { in util_queue_drop_job()
499 if (queue->jobs[i].cleanup) in util_queue_drop_job()
500 queue->jobs[i].cleanup(queue->jobs[i].job, -1); in util_queue_drop_job()
503 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i])); in util_queue_drop_job()
508 mtx_unlock(&queue->lock); in util_queue_drop_job()
527 util_queue_finish(struct util_queue *queue) in util_queue_finish() argument
530 struct util_queue_fence *fences = malloc(queue->num_threads * sizeof(*fences)); in util_queue_finish()
532 util_barrier_init(&barrier, queue->num_threads); in util_queue_finish()
538 mtx_lock(&queue->finish_lock); in util_queue_finish()
540 for (unsigned i = 0; i < queue->num_threads; ++i) { in util_queue_finish()
542 util_queue_add_job(queue, &barrier, &fences[i], util_queue_finish_execute, NULL); in util_queue_finish()
545 for (unsigned i = 0; i < queue->num_threads; ++i) { in util_queue_finish()
549 mtx_unlock(&queue->finish_lock); in util_queue_finish()
557 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index) in util_queue_get_thread_time_nano() argument
560 if (thread_index >= queue->num_threads) in util_queue_get_thread_time_nano()
563 return u_thread_get_time_nano(queue->threads[thread_index]); in util_queue_get_thread_time_nano()