1 /******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #include <assert.h>
20 #include <pthread.h>
21 #include <string.h>
22
23 #include "osi/include/allocator.h"
24 #include "osi/include/fixed_queue.h"
25 #include "osi/include/list.h"
26 #include "osi/include/osi.h"
27 #include "osi/include/semaphore.h"
28 #include "osi/include/reactor.h"
29
30 typedef struct fixed_queue_t {
31 list_t *list;
32 semaphore_t *enqueue_sem;
33 semaphore_t *dequeue_sem;
34 pthread_mutex_t lock;
35 size_t capacity;
36
37 reactor_object_t *dequeue_object;
38 fixed_queue_cb dequeue_ready;
39 void *dequeue_context;
40 } fixed_queue_t;
41
42 static void internal_dequeue_ready(void *context);
43
fixed_queue_new(size_t capacity)44 fixed_queue_t *fixed_queue_new(size_t capacity) {
45 fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t));
46
47 pthread_mutex_init(&ret->lock, NULL);
48 ret->capacity = capacity;
49
50 ret->list = list_new(NULL);
51 if (!ret->list)
52 goto error;
53
54 ret->enqueue_sem = semaphore_new(capacity);
55 if (!ret->enqueue_sem)
56 goto error;
57
58 ret->dequeue_sem = semaphore_new(0);
59 if (!ret->dequeue_sem)
60 goto error;
61
62 return ret;
63
64 error:
65 fixed_queue_free(ret, NULL);
66 return NULL;
67 }
68
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)69 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
70 if (!queue)
71 return;
72
73 fixed_queue_unregister_dequeue(queue);
74
75 if (free_cb)
76 for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
77 free_cb(list_node(node));
78
79 list_free(queue->list);
80 semaphore_free(queue->enqueue_sem);
81 semaphore_free(queue->dequeue_sem);
82 pthread_mutex_destroy(&queue->lock);
83 osi_free(queue);
84 }
85
fixed_queue_is_empty(fixed_queue_t * queue)86 bool fixed_queue_is_empty(fixed_queue_t *queue) {
87 if (queue == NULL)
88 return true;
89
90 pthread_mutex_lock(&queue->lock);
91 bool is_empty = list_is_empty(queue->list);
92 pthread_mutex_unlock(&queue->lock);
93
94 return is_empty;
95 }
96
fixed_queue_length(fixed_queue_t * queue)97 size_t fixed_queue_length(fixed_queue_t *queue) {
98 if (queue == NULL)
99 return 0;
100
101 pthread_mutex_lock(&queue->lock);
102 size_t length = list_length(queue->list);
103 pthread_mutex_unlock(&queue->lock);
104
105 return length;
106 }
107
fixed_queue_capacity(fixed_queue_t * queue)108 size_t fixed_queue_capacity(fixed_queue_t *queue) {
109 assert(queue != NULL);
110
111 return queue->capacity;
112 }
113
fixed_queue_enqueue(fixed_queue_t * queue,void * data)114 void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
115 assert(queue != NULL);
116 assert(data != NULL);
117
118 semaphore_wait(queue->enqueue_sem);
119
120 pthread_mutex_lock(&queue->lock);
121 list_append(queue->list, data);
122 pthread_mutex_unlock(&queue->lock);
123
124 semaphore_post(queue->dequeue_sem);
125 }
126
fixed_queue_dequeue(fixed_queue_t * queue)127 void *fixed_queue_dequeue(fixed_queue_t *queue) {
128 assert(queue != NULL);
129
130 semaphore_wait(queue->dequeue_sem);
131
132 pthread_mutex_lock(&queue->lock);
133 void *ret = list_front(queue->list);
134 list_remove(queue->list, ret);
135 pthread_mutex_unlock(&queue->lock);
136
137 semaphore_post(queue->enqueue_sem);
138
139 return ret;
140 }
141
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)142 bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
143 assert(queue != NULL);
144 assert(data != NULL);
145
146 if (!semaphore_try_wait(queue->enqueue_sem))
147 return false;
148
149 pthread_mutex_lock(&queue->lock);
150 list_append(queue->list, data);
151 pthread_mutex_unlock(&queue->lock);
152
153 semaphore_post(queue->dequeue_sem);
154 return true;
155 }
156
fixed_queue_try_dequeue(fixed_queue_t * queue)157 void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
158 if (queue == NULL)
159 return NULL;
160
161 if (!semaphore_try_wait(queue->dequeue_sem))
162 return NULL;
163
164 pthread_mutex_lock(&queue->lock);
165 void *ret = list_front(queue->list);
166 list_remove(queue->list, ret);
167 pthread_mutex_unlock(&queue->lock);
168
169 semaphore_post(queue->enqueue_sem);
170
171 return ret;
172 }
173
fixed_queue_try_peek_first(fixed_queue_t * queue)174 void *fixed_queue_try_peek_first(fixed_queue_t *queue) {
175 if (queue == NULL)
176 return NULL;
177
178 pthread_mutex_lock(&queue->lock);
179 void *ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
180 pthread_mutex_unlock(&queue->lock);
181
182 return ret;
183 }
184
fixed_queue_try_peek_last(fixed_queue_t * queue)185 void *fixed_queue_try_peek_last(fixed_queue_t *queue) {
186 if (queue == NULL)
187 return NULL;
188
189 pthread_mutex_lock(&queue->lock);
190 void *ret = list_is_empty(queue->list) ? NULL : list_back(queue->list);
191 pthread_mutex_unlock(&queue->lock);
192
193 return ret;
194 }
195
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)196 void *fixed_queue_try_remove_from_queue(fixed_queue_t *queue, void *data) {
197 if (queue == NULL)
198 return NULL;
199
200 bool removed = false;
201 pthread_mutex_lock(&queue->lock);
202 if (list_contains(queue->list, data) &&
203 semaphore_try_wait(queue->dequeue_sem)) {
204 removed = list_remove(queue->list, data);
205 assert(removed);
206 }
207 pthread_mutex_unlock(&queue->lock);
208
209 if (removed) {
210 semaphore_post(queue->enqueue_sem);
211 return data;
212 }
213 return NULL;
214 }
215
fixed_queue_get_list(fixed_queue_t * queue)216 list_t *fixed_queue_get_list(fixed_queue_t *queue) {
217 assert(queue != NULL);
218
219 // NOTE: This function is not thread safe, and there is no point for
220 // calling pthread_mutex_lock() / pthread_mutex_unlock()
221 return queue->list;
222 }
223
224
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)225 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
226 assert(queue != NULL);
227 return semaphore_get_fd(queue->dequeue_sem);
228 }
229
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)230 int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
231 assert(queue != NULL);
232 return semaphore_get_fd(queue->enqueue_sem);
233 }
234
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)235 void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
236 assert(queue != NULL);
237 assert(reactor != NULL);
238 assert(ready_cb != NULL);
239
240 // Make sure we're not already registered
241 fixed_queue_unregister_dequeue(queue);
242
243 queue->dequeue_ready = ready_cb;
244 queue->dequeue_context = context;
245 queue->dequeue_object = reactor_register(
246 reactor,
247 fixed_queue_get_dequeue_fd(queue),
248 queue,
249 internal_dequeue_ready,
250 NULL
251 );
252 }
253
fixed_queue_unregister_dequeue(fixed_queue_t * queue)254 void fixed_queue_unregister_dequeue(fixed_queue_t *queue) {
255 assert(queue != NULL);
256
257 if (queue->dequeue_object) {
258 reactor_unregister(queue->dequeue_object);
259 queue->dequeue_object = NULL;
260 }
261 }
262
internal_dequeue_ready(void * context)263 static void internal_dequeue_ready(void *context) {
264 assert(context != NULL);
265
266 fixed_queue_t *queue = context;
267 queue->dequeue_ready(queue, queue->dequeue_context);
268 }
269