1 /*
2  * Copyright © 2016 Advanced Micro Devices, Inc.
3  * All Rights Reserved.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sub license, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16  * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17  * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20  * USE OR OTHER DEALINGS IN THE SOFTWARE.
21  *
22  * The above copyright notice and this permission notice (including the
23  * next paragraph) shall be included in all copies or substantial portions
24  * of the Software.
25  */
26 
27 #include "u_queue.h"
28 #include "u_memory.h"
29 #include "u_string.h"
30 #include "os/os_time.h"
31 
32 static void util_queue_killall_and_wait(struct util_queue *queue);
33 
34 /****************************************************************************
35  * Wait for all queues to assert idle when exit() is called.
36  *
37  * Otherwise, C++ static variable destructors can be called while threads
38  * are using the static variables.
39  */
40 
41 static once_flag atexit_once_flag = ONCE_FLAG_INIT;
42 static struct list_head queue_list;
43 pipe_static_mutex(exit_mutex);
44 
45 static void
atexit_handler(void)46 atexit_handler(void)
47 {
48    struct util_queue *iter;
49 
50    pipe_mutex_lock(exit_mutex);
51    /* Wait for all queues to assert idle. */
52    LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
53       util_queue_killall_and_wait(iter);
54    }
55    pipe_mutex_unlock(exit_mutex);
56 }
57 
58 static void
global_init(void)59 global_init(void)
60 {
61    LIST_INITHEAD(&queue_list);
62    atexit(atexit_handler);
63 }
64 
65 static void
add_to_atexit_list(struct util_queue * queue)66 add_to_atexit_list(struct util_queue *queue)
67 {
68    call_once(&atexit_once_flag, global_init);
69 
70    pipe_mutex_lock(exit_mutex);
71    LIST_ADD(&queue->head, &queue_list);
72    pipe_mutex_unlock(exit_mutex);
73 }
74 
75 static void
remove_from_atexit_list(struct util_queue * queue)76 remove_from_atexit_list(struct util_queue *queue)
77 {
78    struct util_queue *iter, *tmp;
79 
80    pipe_mutex_lock(exit_mutex);
81    LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
82       if (iter == queue) {
83          LIST_DEL(&iter->head);
84          break;
85       }
86    }
87    pipe_mutex_unlock(exit_mutex);
88 }
89 
90 /****************************************************************************
91  * util_queue implementation
92  */
93 
94 static void
util_queue_fence_signal(struct util_queue_fence * fence)95 util_queue_fence_signal(struct util_queue_fence *fence)
96 {
97    pipe_mutex_lock(fence->mutex);
98    fence->signalled = true;
99    pipe_condvar_broadcast(fence->cond);
100    pipe_mutex_unlock(fence->mutex);
101 }
102 
103 void
util_queue_job_wait(struct util_queue_fence * fence)104 util_queue_job_wait(struct util_queue_fence *fence)
105 {
106    pipe_mutex_lock(fence->mutex);
107    while (!fence->signalled)
108       pipe_condvar_wait(fence->cond, fence->mutex);
109    pipe_mutex_unlock(fence->mutex);
110 }
111 
112 struct thread_input {
113    struct util_queue *queue;
114    int thread_index;
115 };
116 
PIPE_THREAD_ROUTINE(util_queue_thread_func,input)117 static PIPE_THREAD_ROUTINE(util_queue_thread_func, input)
118 {
119    struct util_queue *queue = ((struct thread_input*)input)->queue;
120    int thread_index = ((struct thread_input*)input)->thread_index;
121 
122    FREE(input);
123 
124    if (queue->name) {
125       char name[16];
126       util_snprintf(name, sizeof(name), "%s:%i", queue->name, thread_index);
127       pipe_thread_setname(name);
128    }
129 
130    while (1) {
131       struct util_queue_job job;
132 
133       pipe_mutex_lock(queue->lock);
134       assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
135 
136       /* wait if the queue is empty */
137       while (!queue->kill_threads && queue->num_queued == 0)
138          pipe_condvar_wait(queue->has_queued_cond, queue->lock);
139 
140       if (queue->kill_threads) {
141          pipe_mutex_unlock(queue->lock);
142          break;
143       }
144 
145       job = queue->jobs[queue->read_idx];
146       memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
147       queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
148 
149       queue->num_queued--;
150       pipe_condvar_signal(queue->has_space_cond);
151       pipe_mutex_unlock(queue->lock);
152 
153       if (job.job) {
154          job.execute(job.job, thread_index);
155          util_queue_fence_signal(job.fence);
156          if (job.cleanup)
157             job.cleanup(job.job, thread_index);
158       }
159    }
160 
161    /* signal remaining jobs before terminating */
162    pipe_mutex_lock(queue->lock);
163    while (queue->jobs[queue->read_idx].job) {
164       util_queue_fence_signal(queue->jobs[queue->read_idx].fence);
165 
166       queue->jobs[queue->read_idx].job = NULL;
167       queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
168    }
169    queue->num_queued = 0; /* reset this when exiting the thread */
170    pipe_mutex_unlock(queue->lock);
171    return 0;
172 }
173 
174 bool
util_queue_init(struct util_queue * queue,const char * name,unsigned max_jobs,unsigned num_threads)175 util_queue_init(struct util_queue *queue,
176                 const char *name,
177                 unsigned max_jobs,
178                 unsigned num_threads)
179 {
180    unsigned i;
181 
182    memset(queue, 0, sizeof(*queue));
183    queue->name = name;
184    queue->num_threads = num_threads;
185    queue->max_jobs = max_jobs;
186 
187    queue->jobs = (struct util_queue_job*)
188                  CALLOC(max_jobs, sizeof(struct util_queue_job));
189    if (!queue->jobs)
190       goto fail;
191 
192    pipe_mutex_init(queue->lock);
193 
194    queue->num_queued = 0;
195    pipe_condvar_init(queue->has_queued_cond);
196    pipe_condvar_init(queue->has_space_cond);
197 
198    queue->threads = (pipe_thread*)CALLOC(num_threads, sizeof(pipe_thread));
199    if (!queue->threads)
200       goto fail;
201 
202    /* start threads */
203    for (i = 0; i < num_threads; i++) {
204       struct thread_input *input = MALLOC_STRUCT(thread_input);
205       input->queue = queue;
206       input->thread_index = i;
207 
208       queue->threads[i] = pipe_thread_create(util_queue_thread_func, input);
209 
210       if (!queue->threads[i]) {
211          FREE(input);
212 
213          if (i == 0) {
214             /* no threads created, fail */
215             goto fail;
216          } else {
217             /* at least one thread created, so use it */
218             queue->num_threads = i;
219             break;
220          }
221       }
222    }
223 
224    add_to_atexit_list(queue);
225    return true;
226 
227 fail:
228    FREE(queue->threads);
229 
230    if (queue->jobs) {
231       pipe_condvar_destroy(queue->has_space_cond);
232       pipe_condvar_destroy(queue->has_queued_cond);
233       pipe_mutex_destroy(queue->lock);
234       FREE(queue->jobs);
235    }
236    /* also util_queue_is_initialized can be used to check for success */
237    memset(queue, 0, sizeof(*queue));
238    return false;
239 }
240 
241 static void
util_queue_killall_and_wait(struct util_queue * queue)242 util_queue_killall_and_wait(struct util_queue *queue)
243 {
244    unsigned i;
245 
246    /* Signal all threads to terminate. */
247    pipe_mutex_lock(queue->lock);
248    queue->kill_threads = 1;
249    pipe_condvar_broadcast(queue->has_queued_cond);
250    pipe_mutex_unlock(queue->lock);
251 
252    for (i = 0; i < queue->num_threads; i++)
253       pipe_thread_wait(queue->threads[i]);
254    queue->num_threads = 0;
255 }
256 
257 void
util_queue_destroy(struct util_queue * queue)258 util_queue_destroy(struct util_queue *queue)
259 {
260    util_queue_killall_and_wait(queue);
261    remove_from_atexit_list(queue);
262 
263    pipe_condvar_destroy(queue->has_space_cond);
264    pipe_condvar_destroy(queue->has_queued_cond);
265    pipe_mutex_destroy(queue->lock);
266    FREE(queue->jobs);
267    FREE(queue->threads);
268 }
269 
270 void
util_queue_fence_init(struct util_queue_fence * fence)271 util_queue_fence_init(struct util_queue_fence *fence)
272 {
273    memset(fence, 0, sizeof(*fence));
274    pipe_mutex_init(fence->mutex);
275    pipe_condvar_init(fence->cond);
276    fence->signalled = true;
277 }
278 
279 void
util_queue_fence_destroy(struct util_queue_fence * fence)280 util_queue_fence_destroy(struct util_queue_fence *fence)
281 {
282    assert(fence->signalled);
283    pipe_condvar_destroy(fence->cond);
284    pipe_mutex_destroy(fence->mutex);
285 }
286 
287 void
util_queue_add_job(struct util_queue * queue,void * job,struct util_queue_fence * fence,util_queue_execute_func execute,util_queue_execute_func cleanup)288 util_queue_add_job(struct util_queue *queue,
289                    void *job,
290                    struct util_queue_fence *fence,
291                    util_queue_execute_func execute,
292                    util_queue_execute_func cleanup)
293 {
294    struct util_queue_job *ptr;
295 
296    assert(fence->signalled);
297    fence->signalled = false;
298 
299    pipe_mutex_lock(queue->lock);
300    assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
301 
302    /* if the queue is full, wait until there is space */
303    while (queue->num_queued == queue->max_jobs)
304       pipe_condvar_wait(queue->has_space_cond, queue->lock);
305 
306    ptr = &queue->jobs[queue->write_idx];
307    assert(ptr->job == NULL);
308    ptr->job = job;
309    ptr->fence = fence;
310    ptr->execute = execute;
311    ptr->cleanup = cleanup;
312    queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
313 
314    queue->num_queued++;
315    pipe_condvar_signal(queue->has_queued_cond);
316    pipe_mutex_unlock(queue->lock);
317 }
318