1 /******************************************************************************
2  *
3  *  Copyright (C) 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #include <assert.h>
20 #include <pthread.h>
21 #include <stdlib.h>
22 
23 #include "osi/include/allocator.h"
24 #include "osi/include/fixed_queue.h"
25 #include "osi/include/list.h"
26 #include "osi/include/osi.h"
27 #include "osi/include/semaphore.h"
28 #include "osi/include/reactor.h"
29 
30 typedef struct fixed_queue_t {
31   list_t *list;
32   semaphore_t *enqueue_sem;
33   semaphore_t *dequeue_sem;
34   pthread_mutex_t lock;
35   size_t capacity;
36 
37   reactor_object_t *dequeue_object;
38   fixed_queue_cb dequeue_ready;
39   void *dequeue_context;
40 } fixed_queue_t;
41 
42 static void internal_dequeue_ready(void *context);
43 
fixed_queue_new(size_t capacity)44 fixed_queue_t *fixed_queue_new(size_t capacity) {
45   fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t));
46   if (!ret)
47     goto error;
48 
49   pthread_mutex_init(&ret->lock, NULL);
50   ret->capacity = capacity;
51 
52   ret->list = list_new(NULL);
53   if (!ret->list)
54     goto error;
55 
56   ret->enqueue_sem = semaphore_new(capacity);
57   if (!ret->enqueue_sem)
58     goto error;
59 
60   ret->dequeue_sem = semaphore_new(0);
61   if (!ret->dequeue_sem)
62     goto error;
63 
64   return ret;
65 
66 error:;
67   fixed_queue_free(ret, NULL);
68   return NULL;
69 }
70 
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)71 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
72   if (!queue)
73     return;
74 
75   fixed_queue_unregister_dequeue(queue);
76 
77   if (free_cb)
78     for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
79       free_cb(list_node(node));
80 
81   list_free(queue->list);
82   semaphore_free(queue->enqueue_sem);
83   semaphore_free(queue->dequeue_sem);
84   pthread_mutex_destroy(&queue->lock);
85   osi_free(queue);
86 }
87 
fixed_queue_is_empty(fixed_queue_t * queue)88 bool fixed_queue_is_empty(fixed_queue_t *queue) {
89   assert(queue != NULL);
90 
91   pthread_mutex_lock(&queue->lock);
92   bool is_empty = list_is_empty(queue->list);
93   pthread_mutex_unlock(&queue->lock);
94 
95   return is_empty;
96 }
97 
fixed_queue_capacity(fixed_queue_t * queue)98 size_t fixed_queue_capacity(fixed_queue_t *queue) {
99   assert(queue != NULL);
100 
101   return queue->capacity;
102 }
103 
fixed_queue_enqueue(fixed_queue_t * queue,void * data)104 void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
105   assert(queue != NULL);
106   assert(data != NULL);
107 
108   semaphore_wait(queue->enqueue_sem);
109 
110   pthread_mutex_lock(&queue->lock);
111   list_append(queue->list, data);
112   pthread_mutex_unlock(&queue->lock);
113 
114   semaphore_post(queue->dequeue_sem);
115 }
116 
fixed_queue_dequeue(fixed_queue_t * queue)117 void *fixed_queue_dequeue(fixed_queue_t *queue) {
118   assert(queue != NULL);
119 
120   semaphore_wait(queue->dequeue_sem);
121 
122   pthread_mutex_lock(&queue->lock);
123   void *ret = list_front(queue->list);
124   list_remove(queue->list, ret);
125   pthread_mutex_unlock(&queue->lock);
126 
127   semaphore_post(queue->enqueue_sem);
128 
129   return ret;
130 }
131 
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)132 bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
133   assert(queue != NULL);
134   assert(data != NULL);
135 
136   if (!semaphore_try_wait(queue->enqueue_sem))
137     return false;
138 
139   pthread_mutex_lock(&queue->lock);
140   list_append(queue->list, data);
141   pthread_mutex_unlock(&queue->lock);
142 
143   semaphore_post(queue->dequeue_sem);
144   return true;
145 }
146 
fixed_queue_try_dequeue(fixed_queue_t * queue)147 void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
148   assert(queue != NULL);
149 
150   if (!semaphore_try_wait(queue->dequeue_sem))
151     return NULL;
152 
153   pthread_mutex_lock(&queue->lock);
154   void *ret = list_front(queue->list);
155   list_remove(queue->list, ret);
156   pthread_mutex_unlock(&queue->lock);
157 
158   semaphore_post(queue->enqueue_sem);
159 
160   return ret;
161 }
162 
fixed_queue_try_peek(fixed_queue_t * queue)163 void *fixed_queue_try_peek(fixed_queue_t *queue) {
164   assert(queue != NULL);
165 
166   pthread_mutex_lock(&queue->lock);
167   // Because protected by the lock, the empty and front calls are atomic and not a race condition
168   void *ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
169   pthread_mutex_unlock(&queue->lock);
170 
171   return ret;
172 }
173 
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)174 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
175   assert(queue != NULL);
176   return semaphore_get_fd(queue->dequeue_sem);
177 }
178 
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)179 int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
180   assert(queue != NULL);
181   return semaphore_get_fd(queue->enqueue_sem);
182 }
183 
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)184 void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
185   assert(queue != NULL);
186   assert(reactor != NULL);
187   assert(ready_cb != NULL);
188 
189   // Make sure we're not already registered
190   fixed_queue_unregister_dequeue(queue);
191 
192   queue->dequeue_ready = ready_cb;
193   queue->dequeue_context = context;
194   queue->dequeue_object = reactor_register(
195     reactor,
196     fixed_queue_get_dequeue_fd(queue),
197     queue,
198     internal_dequeue_ready,
199     NULL
200   );
201 }
202 
fixed_queue_unregister_dequeue(fixed_queue_t * queue)203 void fixed_queue_unregister_dequeue(fixed_queue_t *queue) {
204   assert(queue != NULL);
205 
206   if (queue->dequeue_object) {
207     reactor_unregister(queue->dequeue_object);
208     queue->dequeue_object = NULL;
209   }
210 }
211 
internal_dequeue_ready(void * context)212 static void internal_dequeue_ready(void *context) {
213   assert(context != NULL);
214 
215   fixed_queue_t *queue = context;
216   queue->dequeue_ready(queue, queue->dequeue_context);
217 }
218