1 /*
2  * Copyright 2016 Patrick Rudolph <siro@das-labor.org>
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a
5  * copy of this software and associated documentation files (the "Software"),
6  * to deal in the Software without restriction, including without limitation
7  * on the rights to use, copy, modify, merge, publish, distribute, sub
8  * license, and/or sell copies of the Software, and to permit persons to whom
9  * the Software is furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice (including the next
12  * paragraph) shall be included in all copies or substantial portions of the
13  * Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL
18  * THE AUTHOR(S) AND/OR THEIR SUPPLIERS BE LIABLE FOR ANY CLAIM,
19  * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
20  * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
21  * USE OR OTHER DEALINGS IN THE SOFTWARE. */
22 
23 #include "nine_queue.h"
24 #include "os/os_thread.h"
25 #include "util/macros.h"
26 #include "nine_helpers.h"
27 
28 #define NINE_CMD_BUF_INSTR (256)
29 
30 #define NINE_CMD_BUFS (32)
31 #define NINE_CMD_BUFS_MASK (NINE_CMD_BUFS - 1)
32 
33 #define NINE_QUEUE_SIZE (8192 * 16 + 128)
34 
35 #define DBG_CHANNEL DBG_DEVICE
36 
37 /*
38  * Single producer - single consumer pool queue
39  *
40  * Producer:
41  * Calls nine_queue_alloc to get a slice of memory in current cmdbuf.
42  * Calls nine_queue_flush to flush the queue by request.
43  * The queue is flushed automatically on insufficient space or once the
44  * cmdbuf contains NINE_CMD_BUF_INSTR instructions.
45  *
46  * nine_queue_flush does block, while nine_queue_alloc doesn't block.
47  *
48  * nine_queue_alloc returns NULL on insufficent space.
49  *
50  * Consumer:
51  * Calls nine_queue_wait_flush to wait for a cmdbuf.
52  * After waiting for a cmdbuf it calls nine_queue_get until NULL is returned.
53  *
54  * nine_queue_wait_flush does block, while nine_queue_get doesn't block.
55  *
56  * Constrains:
57  * Only a single consumer and a single producer are supported.
58  *
59  */
60 
61 struct nine_cmdbuf {
62     unsigned instr_size[NINE_CMD_BUF_INSTR];
63     unsigned num_instr;
64     unsigned offset;
65     void *mem_pool;
66     BOOL full;
67 };
68 
69 struct nine_queue_pool {
70     struct nine_cmdbuf pool[NINE_CMD_BUFS];
71     unsigned head;
72     unsigned tail;
73     unsigned cur_instr;
74     BOOL worker_wait;
75     cnd_t event_pop;
76     cnd_t event_push;
77     mtx_t mutex_pop;
78     mtx_t mutex_push;
79 };
80 
81 /* Consumer functions: */
82 void
nine_queue_wait_flush(struct nine_queue_pool * ctx)83 nine_queue_wait_flush(struct nine_queue_pool* ctx)
84 {
85     struct nine_cmdbuf *cmdbuf = &ctx->pool[ctx->tail];
86 
87     /* wait for cmdbuf full */
88     mtx_lock(&ctx->mutex_push);
89     while (!cmdbuf->full)
90     {
91         DBG("waiting for full cmdbuf\n");
92         cnd_wait(&ctx->event_push, &ctx->mutex_push);
93     }
94     DBG("got cmdbuf=%p\n", cmdbuf);
95     mtx_unlock(&ctx->mutex_push);
96 
97     cmdbuf->offset = 0;
98     ctx->cur_instr = 0;
99 }
100 
101 /* Gets a pointer to the next memory slice.
102  * Does not block.
103  * Returns NULL on empty cmdbuf. */
104 void *
nine_queue_get(struct nine_queue_pool * ctx)105 nine_queue_get(struct nine_queue_pool* ctx)
106 {
107     struct nine_cmdbuf *cmdbuf = &ctx->pool[ctx->tail];
108     unsigned offset;
109 
110     /* At this pointer there's always a cmdbuf. */
111 
112     if (ctx->cur_instr == cmdbuf->num_instr) {
113         /* signal waiting producer */
114         mtx_lock(&ctx->mutex_pop);
115         DBG("freeing cmdbuf=%p\n", cmdbuf);
116         cmdbuf->full = 0;
117         cnd_signal(&ctx->event_pop);
118         mtx_unlock(&ctx->mutex_pop);
119 
120         ctx->tail = (ctx->tail + 1) & NINE_CMD_BUFS_MASK;
121 
122         return NULL;
123     }
124 
125     /* At this pointer there's always a cmdbuf with instruction to process. */
126     offset = cmdbuf->offset;
127     cmdbuf->offset += cmdbuf->instr_size[ctx->cur_instr];
128     ctx->cur_instr ++;
129 
130     return cmdbuf->mem_pool + offset;
131 }
132 
133 /* Producer functions: */
134 
135 /* Flushes the queue.
136  * Moves the current cmdbuf to worker thread.
137  * Blocks until next cmdbuf is free. */
138 void
nine_queue_flush(struct nine_queue_pool * ctx)139 nine_queue_flush(struct nine_queue_pool* ctx)
140 {
141     struct nine_cmdbuf *cmdbuf = &ctx->pool[ctx->head];
142 
143     DBG("flushing cmdbuf=%p instr=%d size=%d\n",
144            cmdbuf, cmdbuf->num_instr, cmdbuf->offset);
145 
146     /* Nothing to flush */
147     if (!cmdbuf->num_instr)
148         return;
149 
150     /* signal waiting worker */
151     mtx_lock(&ctx->mutex_push);
152     cmdbuf->full = 1;
153     cnd_signal(&ctx->event_push);
154     mtx_unlock(&ctx->mutex_push);
155 
156     ctx->head = (ctx->head + 1) & NINE_CMD_BUFS_MASK;
157 
158     cmdbuf = &ctx->pool[ctx->head];
159 
160     /* wait for queue empty */
161     mtx_lock(&ctx->mutex_pop);
162     while (cmdbuf->full)
163     {
164         DBG("waiting for empty cmdbuf\n");
165         cnd_wait(&ctx->event_pop, &ctx->mutex_pop);
166     }
167     DBG("got empty cmdbuf=%p\n", cmdbuf);
168     mtx_unlock(&ctx->mutex_pop);
169     cmdbuf->offset = 0;
170     cmdbuf->num_instr = 0;
171 }
172 
173 /* Gets a a pointer to slice of memory with size @space.
174  * Does block if queue is full.
175  * Returns NULL on @space > NINE_QUEUE_SIZE. */
176 void *
nine_queue_alloc(struct nine_queue_pool * ctx,unsigned space)177 nine_queue_alloc(struct nine_queue_pool* ctx, unsigned space)
178 {
179     unsigned offset;
180     struct nine_cmdbuf *cmdbuf = &ctx->pool[ctx->head];
181 
182     if (space > NINE_QUEUE_SIZE)
183         return NULL;
184 
185     /* at this pointer there's always a free queue available */
186 
187     if ((cmdbuf->offset + space > NINE_QUEUE_SIZE) ||
188         (cmdbuf->num_instr == NINE_CMD_BUF_INSTR)) {
189 
190         nine_queue_flush(ctx);
191 
192         cmdbuf = &ctx->pool[ctx->head];
193     }
194 
195     DBG("cmdbuf=%p space=%d\n", cmdbuf, space);
196 
197     /* at this pointer there's always a free queue with sufficient space available */
198 
199     offset = cmdbuf->offset;
200     cmdbuf->offset += space;
201     cmdbuf->instr_size[cmdbuf->num_instr] = space;
202     cmdbuf->num_instr ++;
203 
204     return cmdbuf->mem_pool + offset;
205 }
206 
207 /* Returns the current queue flush state.
208  * TRUE nothing flushed
209  * FALSE one ore more instructions queued flushed. */
210 bool
nine_queue_no_flushed_work(struct nine_queue_pool * ctx)211 nine_queue_no_flushed_work(struct nine_queue_pool* ctx)
212 {
213     return (ctx->tail == ctx->head);
214 }
215 
216 /* Returns the current queue empty state.
217  * TRUE no instructions queued.
218  * FALSE one ore more instructions queued. */
219 bool
nine_queue_isempty(struct nine_queue_pool * ctx)220 nine_queue_isempty(struct nine_queue_pool* ctx)
221 {
222     struct nine_cmdbuf *cmdbuf = &ctx->pool[ctx->head];
223 
224     return (ctx->tail == ctx->head) && !cmdbuf->num_instr;
225 }
226 
227 struct nine_queue_pool*
nine_queue_create(void)228 nine_queue_create(void)
229 {
230     unsigned i;
231     struct nine_queue_pool *ctx;
232 
233     ctx = CALLOC_STRUCT(nine_queue_pool);
234     if (!ctx)
235         goto failed;
236 
237     for (i = 0; i < NINE_CMD_BUFS; i++) {
238         ctx->pool[i].mem_pool = MALLOC(NINE_QUEUE_SIZE);
239         if (!ctx->pool[i].mem_pool)
240             goto failed;
241     }
242 
243     cnd_init(&ctx->event_pop);
244     (void) mtx_init(&ctx->mutex_pop, mtx_plain);
245 
246     cnd_init(&ctx->event_push);
247     (void) mtx_init(&ctx->mutex_push, mtx_plain);
248 
249     /* Block until first cmdbuf has been flushed. */
250     ctx->worker_wait = TRUE;
251 
252     return ctx;
253 failed:
254     if (ctx) {
255         for (i = 0; i < NINE_CMD_BUFS; i++) {
256             if (ctx->pool[i].mem_pool)
257                 FREE(ctx->pool[i].mem_pool);
258         }
259         FREE(ctx);
260     }
261     return NULL;
262 }
263 
264 void
nine_queue_delete(struct nine_queue_pool * ctx)265 nine_queue_delete(struct nine_queue_pool *ctx)
266 {
267     unsigned i;
268 
269     mtx_destroy(&ctx->mutex_pop);
270     cnd_destroy(&ctx->event_pop);
271 
272     mtx_destroy(&ctx->mutex_push);
273     cnd_destroy(&ctx->event_push);
274 
275     for (i = 0; i < NINE_CMD_BUFS; i++)
276         FREE(ctx->pool[i].mem_pool);
277 
278     FREE(ctx);
279 }
280