1 /******************************************************************************
2  *
3  *  Copyright (C) 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #include <assert.h>
20 #include <pthread.h>
21 #include <string.h>
22 
23 #include "osi/include/allocator.h"
24 #include "osi/include/fixed_queue.h"
25 #include "osi/include/list.h"
26 #include "osi/include/osi.h"
27 #include "osi/include/semaphore.h"
28 #include "osi/include/reactor.h"
29 
30 typedef struct fixed_queue_t {
31   list_t *list;
32   semaphore_t *enqueue_sem;
33   semaphore_t *dequeue_sem;
34   pthread_mutex_t lock;
35   size_t capacity;
36 
37   reactor_object_t *dequeue_object;
38   fixed_queue_cb dequeue_ready;
39   void *dequeue_context;
40 } fixed_queue_t;
41 
42 static void internal_dequeue_ready(void *context);
43 
fixed_queue_new(size_t capacity)44 fixed_queue_t *fixed_queue_new(size_t capacity) {
45   fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t));
46 
47   pthread_mutex_init(&ret->lock, NULL);
48   ret->capacity = capacity;
49 
50   ret->list = list_new(NULL);
51   if (!ret->list)
52     goto error;
53 
54   ret->enqueue_sem = semaphore_new(capacity);
55   if (!ret->enqueue_sem)
56     goto error;
57 
58   ret->dequeue_sem = semaphore_new(0);
59   if (!ret->dequeue_sem)
60     goto error;
61 
62   return ret;
63 
64 error:
65   fixed_queue_free(ret, NULL);
66   return NULL;
67 }
68 
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)69 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
70   if (!queue)
71     return;
72 
73   fixed_queue_unregister_dequeue(queue);
74 
75   if (free_cb)
76     for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
77       free_cb(list_node(node));
78 
79   list_free(queue->list);
80   semaphore_free(queue->enqueue_sem);
81   semaphore_free(queue->dequeue_sem);
82   pthread_mutex_destroy(&queue->lock);
83   osi_free(queue);
84 }
85 
fixed_queue_is_empty(fixed_queue_t * queue)86 bool fixed_queue_is_empty(fixed_queue_t *queue) {
87   if (queue == NULL)
88     return true;
89 
90   pthread_mutex_lock(&queue->lock);
91   bool is_empty = list_is_empty(queue->list);
92   pthread_mutex_unlock(&queue->lock);
93 
94   return is_empty;
95 }
96 
fixed_queue_length(fixed_queue_t * queue)97 size_t fixed_queue_length(fixed_queue_t *queue) {
98   if (queue == NULL)
99     return 0;
100 
101   pthread_mutex_lock(&queue->lock);
102   size_t length = list_length(queue->list);
103   pthread_mutex_unlock(&queue->lock);
104 
105   return length;
106 }
107 
fixed_queue_capacity(fixed_queue_t * queue)108 size_t fixed_queue_capacity(fixed_queue_t *queue) {
109   assert(queue != NULL);
110 
111   return queue->capacity;
112 }
113 
fixed_queue_enqueue(fixed_queue_t * queue,void * data)114 void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
115   assert(queue != NULL);
116   assert(data != NULL);
117 
118   semaphore_wait(queue->enqueue_sem);
119 
120   pthread_mutex_lock(&queue->lock);
121   list_append(queue->list, data);
122   pthread_mutex_unlock(&queue->lock);
123 
124   semaphore_post(queue->dequeue_sem);
125 }
126 
fixed_queue_dequeue(fixed_queue_t * queue)127 void *fixed_queue_dequeue(fixed_queue_t *queue) {
128   assert(queue != NULL);
129 
130   semaphore_wait(queue->dequeue_sem);
131 
132   pthread_mutex_lock(&queue->lock);
133   void *ret = list_front(queue->list);
134   list_remove(queue->list, ret);
135   pthread_mutex_unlock(&queue->lock);
136 
137   semaphore_post(queue->enqueue_sem);
138 
139   return ret;
140 }
141 
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)142 bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
143   assert(queue != NULL);
144   assert(data != NULL);
145 
146   if (!semaphore_try_wait(queue->enqueue_sem))
147     return false;
148 
149   pthread_mutex_lock(&queue->lock);
150   list_append(queue->list, data);
151   pthread_mutex_unlock(&queue->lock);
152 
153   semaphore_post(queue->dequeue_sem);
154   return true;
155 }
156 
fixed_queue_try_dequeue(fixed_queue_t * queue)157 void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
158   if (queue == NULL)
159     return NULL;
160 
161   if (!semaphore_try_wait(queue->dequeue_sem))
162     return NULL;
163 
164   pthread_mutex_lock(&queue->lock);
165   void *ret = list_front(queue->list);
166   list_remove(queue->list, ret);
167   pthread_mutex_unlock(&queue->lock);
168 
169   semaphore_post(queue->enqueue_sem);
170 
171   return ret;
172 }
173 
fixed_queue_try_peek_first(fixed_queue_t * queue)174 void *fixed_queue_try_peek_first(fixed_queue_t *queue) {
175   if (queue == NULL)
176     return NULL;
177 
178   pthread_mutex_lock(&queue->lock);
179   void *ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
180   pthread_mutex_unlock(&queue->lock);
181 
182   return ret;
183 }
184 
fixed_queue_try_peek_last(fixed_queue_t * queue)185 void *fixed_queue_try_peek_last(fixed_queue_t *queue) {
186   if (queue == NULL)
187     return NULL;
188 
189   pthread_mutex_lock(&queue->lock);
190   void *ret = list_is_empty(queue->list) ? NULL : list_back(queue->list);
191   pthread_mutex_unlock(&queue->lock);
192 
193   return ret;
194 }
195 
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)196 void *fixed_queue_try_remove_from_queue(fixed_queue_t *queue, void *data) {
197   if (queue == NULL)
198     return NULL;
199 
200   bool removed = false;
201   pthread_mutex_lock(&queue->lock);
202   if (list_contains(queue->list, data) &&
203       semaphore_try_wait(queue->dequeue_sem)) {
204     removed = list_remove(queue->list, data);
205     assert(removed);
206   }
207   pthread_mutex_unlock(&queue->lock);
208 
209   if (removed) {
210     semaphore_post(queue->enqueue_sem);
211     return data;
212   }
213   return NULL;
214 }
215 
fixed_queue_get_list(fixed_queue_t * queue)216 list_t *fixed_queue_get_list(fixed_queue_t *queue) {
217   assert(queue != NULL);
218 
219   // NOTE: This function is not thread safe, and there is no point for
220   // calling pthread_mutex_lock() / pthread_mutex_unlock()
221   return queue->list;
222 }
223 
224 
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)225 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
226   assert(queue != NULL);
227   return semaphore_get_fd(queue->dequeue_sem);
228 }
229 
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)230 int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
231   assert(queue != NULL);
232   return semaphore_get_fd(queue->enqueue_sem);
233 }
234 
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)235 void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
236   assert(queue != NULL);
237   assert(reactor != NULL);
238   assert(ready_cb != NULL);
239 
240   // Make sure we're not already registered
241   fixed_queue_unregister_dequeue(queue);
242 
243   queue->dequeue_ready = ready_cb;
244   queue->dequeue_context = context;
245   queue->dequeue_object = reactor_register(
246     reactor,
247     fixed_queue_get_dequeue_fd(queue),
248     queue,
249     internal_dequeue_ready,
250     NULL
251   );
252 }
253 
fixed_queue_unregister_dequeue(fixed_queue_t * queue)254 void fixed_queue_unregister_dequeue(fixed_queue_t *queue) {
255   assert(queue != NULL);
256 
257   if (queue->dequeue_object) {
258     reactor_unregister(queue->dequeue_object);
259     queue->dequeue_object = NULL;
260   }
261 }
262 
internal_dequeue_ready(void * context)263 static void internal_dequeue_ready(void *context) {
264   assert(context != NULL);
265 
266   fixed_queue_t *queue = context;
267   queue->dequeue_ready(queue, queue->dequeue_context);
268 }
269