1 /******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #include <assert.h>
20 #include <pthread.h>
21 #include <stdlib.h>
22
23 #include "osi/include/allocator.h"
24 #include "osi/include/fixed_queue.h"
25 #include "osi/include/list.h"
26 #include "osi/include/osi.h"
27 #include "osi/include/semaphore.h"
28 #include "osi/include/reactor.h"
29
30 typedef struct fixed_queue_t {
31 list_t *list;
32 semaphore_t *enqueue_sem;
33 semaphore_t *dequeue_sem;
34 pthread_mutex_t lock;
35 size_t capacity;
36
37 reactor_object_t *dequeue_object;
38 fixed_queue_cb dequeue_ready;
39 void *dequeue_context;
40 } fixed_queue_t;
41
42 static void internal_dequeue_ready(void *context);
43
fixed_queue_new(size_t capacity)44 fixed_queue_t *fixed_queue_new(size_t capacity) {
45 fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t));
46 if (!ret)
47 goto error;
48
49 pthread_mutex_init(&ret->lock, NULL);
50 ret->capacity = capacity;
51
52 ret->list = list_new(NULL);
53 if (!ret->list)
54 goto error;
55
56 ret->enqueue_sem = semaphore_new(capacity);
57 if (!ret->enqueue_sem)
58 goto error;
59
60 ret->dequeue_sem = semaphore_new(0);
61 if (!ret->dequeue_sem)
62 goto error;
63
64 return ret;
65
66 error:;
67 fixed_queue_free(ret, NULL);
68 return NULL;
69 }
70
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)71 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
72 if (!queue)
73 return;
74
75 fixed_queue_unregister_dequeue(queue);
76
77 if (free_cb)
78 for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
79 free_cb(list_node(node));
80
81 list_free(queue->list);
82 semaphore_free(queue->enqueue_sem);
83 semaphore_free(queue->dequeue_sem);
84 pthread_mutex_destroy(&queue->lock);
85 osi_free(queue);
86 }
87
fixed_queue_is_empty(fixed_queue_t * queue)88 bool fixed_queue_is_empty(fixed_queue_t *queue) {
89 assert(queue != NULL);
90
91 pthread_mutex_lock(&queue->lock);
92 bool is_empty = list_is_empty(queue->list);
93 pthread_mutex_unlock(&queue->lock);
94
95 return is_empty;
96 }
97
fixed_queue_capacity(fixed_queue_t * queue)98 size_t fixed_queue_capacity(fixed_queue_t *queue) {
99 assert(queue != NULL);
100
101 return queue->capacity;
102 }
103
fixed_queue_enqueue(fixed_queue_t * queue,void * data)104 void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
105 assert(queue != NULL);
106 assert(data != NULL);
107
108 semaphore_wait(queue->enqueue_sem);
109
110 pthread_mutex_lock(&queue->lock);
111 list_append(queue->list, data);
112 pthread_mutex_unlock(&queue->lock);
113
114 semaphore_post(queue->dequeue_sem);
115 }
116
fixed_queue_dequeue(fixed_queue_t * queue)117 void *fixed_queue_dequeue(fixed_queue_t *queue) {
118 assert(queue != NULL);
119
120 semaphore_wait(queue->dequeue_sem);
121
122 pthread_mutex_lock(&queue->lock);
123 void *ret = list_front(queue->list);
124 list_remove(queue->list, ret);
125 pthread_mutex_unlock(&queue->lock);
126
127 semaphore_post(queue->enqueue_sem);
128
129 return ret;
130 }
131
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)132 bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
133 assert(queue != NULL);
134 assert(data != NULL);
135
136 if (!semaphore_try_wait(queue->enqueue_sem))
137 return false;
138
139 pthread_mutex_lock(&queue->lock);
140 list_append(queue->list, data);
141 pthread_mutex_unlock(&queue->lock);
142
143 semaphore_post(queue->dequeue_sem);
144 return true;
145 }
146
fixed_queue_try_dequeue(fixed_queue_t * queue)147 void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
148 assert(queue != NULL);
149
150 if (!semaphore_try_wait(queue->dequeue_sem))
151 return NULL;
152
153 pthread_mutex_lock(&queue->lock);
154 void *ret = list_front(queue->list);
155 list_remove(queue->list, ret);
156 pthread_mutex_unlock(&queue->lock);
157
158 semaphore_post(queue->enqueue_sem);
159
160 return ret;
161 }
162
fixed_queue_try_peek(fixed_queue_t * queue)163 void *fixed_queue_try_peek(fixed_queue_t *queue) {
164 assert(queue != NULL);
165
166 pthread_mutex_lock(&queue->lock);
167 // Because protected by the lock, the empty and front calls are atomic and not a race condition
168 void *ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
169 pthread_mutex_unlock(&queue->lock);
170
171 return ret;
172 }
173
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)174 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
175 assert(queue != NULL);
176 return semaphore_get_fd(queue->dequeue_sem);
177 }
178
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)179 int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
180 assert(queue != NULL);
181 return semaphore_get_fd(queue->enqueue_sem);
182 }
183
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)184 void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
185 assert(queue != NULL);
186 assert(reactor != NULL);
187 assert(ready_cb != NULL);
188
189 // Make sure we're not already registered
190 fixed_queue_unregister_dequeue(queue);
191
192 queue->dequeue_ready = ready_cb;
193 queue->dequeue_context = context;
194 queue->dequeue_object = reactor_register(
195 reactor,
196 fixed_queue_get_dequeue_fd(queue),
197 queue,
198 internal_dequeue_ready,
199 NULL
200 );
201 }
202
fixed_queue_unregister_dequeue(fixed_queue_t * queue)203 void fixed_queue_unregister_dequeue(fixed_queue_t *queue) {
204 assert(queue != NULL);
205
206 if (queue->dequeue_object) {
207 reactor_unregister(queue->dequeue_object);
208 queue->dequeue_object = NULL;
209 }
210 }
211
internal_dequeue_ready(void * context)212 static void internal_dequeue_ready(void *context) {
213 assert(context != NULL);
214
215 fixed_queue_t *queue = context;
216 queue->dequeue_ready(queue, queue->dequeue_context);
217 }
218