1 /******************************************************************************
2  *
3  *  Copyright 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #include <base/logging.h>
20 #include <string.h>
21 
22 #include <mutex>
23 
24 #include "osi/include/allocator.h"
25 #include "osi/include/fixed_queue.h"
26 #include "osi/include/list.h"
27 #include "osi/include/osi.h"
28 #include "osi/include/reactor.h"
29 #include "osi/include/semaphore.h"
30 
31 typedef struct fixed_queue_t {
32   list_t* list;
33   semaphore_t* enqueue_sem;
34   semaphore_t* dequeue_sem;
35   std::mutex* mutex;
36   size_t capacity;
37 
38   reactor_object_t* dequeue_object;
39   fixed_queue_cb dequeue_ready;
40   void* dequeue_context;
41 } fixed_queue_t;
42 
43 static void internal_dequeue_ready(void* context);
44 
fixed_queue_new(size_t capacity)45 fixed_queue_t* fixed_queue_new(size_t capacity) {
46   fixed_queue_t* ret =
47       static_cast<fixed_queue_t*>(osi_calloc(sizeof(fixed_queue_t)));
48 
49   ret->mutex = new std::mutex;
50   ret->capacity = capacity;
51 
52   ret->list = list_new(NULL);
53   if (!ret->list) goto error;
54 
55   ret->enqueue_sem = semaphore_new(capacity);
56   if (!ret->enqueue_sem) goto error;
57 
58   ret->dequeue_sem = semaphore_new(0);
59   if (!ret->dequeue_sem) goto error;
60 
61   return ret;
62 
63 error:
64   fixed_queue_free(ret, NULL);
65   return NULL;
66 }
67 
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)68 void fixed_queue_free(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
69   if (!queue) return;
70 
71   fixed_queue_unregister_dequeue(queue);
72 
73   if (free_cb)
74     for (const list_node_t* node = list_begin(queue->list);
75          node != list_end(queue->list); node = list_next(node))
76       free_cb(list_node(node));
77 
78   list_free(queue->list);
79   semaphore_free(queue->enqueue_sem);
80   semaphore_free(queue->dequeue_sem);
81   delete queue->mutex;
82   osi_free(queue);
83 }
84 
fixed_queue_flush(fixed_queue_t * queue,fixed_queue_free_cb free_cb)85 void fixed_queue_flush(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
86   if (!queue) return;
87 
88   while (!fixed_queue_is_empty(queue)) {
89     void* data = fixed_queue_try_dequeue(queue);
90     if (free_cb != NULL) {
91       free_cb(data);
92     }
93   }
94 }
95 
fixed_queue_is_empty(fixed_queue_t * queue)96 bool fixed_queue_is_empty(fixed_queue_t* queue) {
97   if (queue == NULL) return true;
98 
99   std::lock_guard<std::mutex> lock(*queue->mutex);
100   return list_is_empty(queue->list);
101 }
102 
fixed_queue_length(fixed_queue_t * queue)103 size_t fixed_queue_length(fixed_queue_t* queue) {
104   if (queue == NULL) return 0;
105 
106   std::lock_guard<std::mutex> lock(*queue->mutex);
107   return list_length(queue->list);
108 }
109 
fixed_queue_capacity(fixed_queue_t * queue)110 size_t fixed_queue_capacity(fixed_queue_t* queue) {
111   CHECK(queue != NULL);
112 
113   return queue->capacity;
114 }
115 
fixed_queue_enqueue(fixed_queue_t * queue,void * data)116 void fixed_queue_enqueue(fixed_queue_t* queue, void* data) {
117   CHECK(queue != NULL);
118   CHECK(data != NULL);
119 
120   semaphore_wait(queue->enqueue_sem);
121 
122   {
123     std::lock_guard<std::mutex> lock(*queue->mutex);
124     list_append(queue->list, data);
125   }
126 
127   semaphore_post(queue->dequeue_sem);
128 }
129 
fixed_queue_dequeue(fixed_queue_t * queue)130 void* fixed_queue_dequeue(fixed_queue_t* queue) {
131   CHECK(queue != NULL);
132 
133   semaphore_wait(queue->dequeue_sem);
134 
135   void* ret = NULL;
136   {
137     std::lock_guard<std::mutex> lock(*queue->mutex);
138     ret = list_front(queue->list);
139     list_remove(queue->list, ret);
140   }
141 
142   semaphore_post(queue->enqueue_sem);
143 
144   return ret;
145 }
146 
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)147 bool fixed_queue_try_enqueue(fixed_queue_t* queue, void* data) {
148   CHECK(queue != NULL);
149   CHECK(data != NULL);
150 
151   if (!semaphore_try_wait(queue->enqueue_sem)) return false;
152 
153   {
154     std::lock_guard<std::mutex> lock(*queue->mutex);
155     list_append(queue->list, data);
156   }
157 
158   semaphore_post(queue->dequeue_sem);
159   return true;
160 }
161 
fixed_queue_try_dequeue(fixed_queue_t * queue)162 void* fixed_queue_try_dequeue(fixed_queue_t* queue) {
163   if (queue == NULL) return NULL;
164 
165   if (!semaphore_try_wait(queue->dequeue_sem)) return NULL;
166 
167   void* ret = NULL;
168   {
169     std::lock_guard<std::mutex> lock(*queue->mutex);
170     ret = list_front(queue->list);
171     list_remove(queue->list, ret);
172   }
173 
174   semaphore_post(queue->enqueue_sem);
175 
176   return ret;
177 }
178 
fixed_queue_try_peek_first(fixed_queue_t * queue)179 void* fixed_queue_try_peek_first(fixed_queue_t* queue) {
180   if (queue == NULL) return NULL;
181 
182   std::lock_guard<std::mutex> lock(*queue->mutex);
183   return list_is_empty(queue->list) ? NULL : list_front(queue->list);
184 }
185 
fixed_queue_try_peek_last(fixed_queue_t * queue)186 void* fixed_queue_try_peek_last(fixed_queue_t* queue) {
187   if (queue == NULL) return NULL;
188 
189   std::lock_guard<std::mutex> lock(*queue->mutex);
190   return list_is_empty(queue->list) ? NULL : list_back(queue->list);
191 }
192 
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)193 void* fixed_queue_try_remove_from_queue(fixed_queue_t* queue, void* data) {
194   if (queue == NULL) return NULL;
195 
196   bool removed = false;
197   {
198     std::lock_guard<std::mutex> lock(*queue->mutex);
199     if (list_contains(queue->list, data) &&
200         semaphore_try_wait(queue->dequeue_sem)) {
201       removed = list_remove(queue->list, data);
202       CHECK(removed);
203     }
204   }
205 
206   if (removed) {
207     semaphore_post(queue->enqueue_sem);
208     return data;
209   }
210   return NULL;
211 }
212 
fixed_queue_get_list(fixed_queue_t * queue)213 list_t* fixed_queue_get_list(fixed_queue_t* queue) {
214   CHECK(queue != NULL);
215 
216   // NOTE: Using the list in this way is not thread-safe.
217   // Using this list in any context where threads can call other functions
218   // to the queue can break our assumptions and the queue in general.
219   return queue->list;
220 }
221 
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)222 int fixed_queue_get_dequeue_fd(const fixed_queue_t* queue) {
223   CHECK(queue != NULL);
224   return semaphore_get_fd(queue->dequeue_sem);
225 }
226 
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)227 int fixed_queue_get_enqueue_fd(const fixed_queue_t* queue) {
228   CHECK(queue != NULL);
229   return semaphore_get_fd(queue->enqueue_sem);
230 }
231 
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)232 void fixed_queue_register_dequeue(fixed_queue_t* queue, reactor_t* reactor,
233                                   fixed_queue_cb ready_cb, void* context) {
234   CHECK(queue != NULL);
235   CHECK(reactor != NULL);
236   CHECK(ready_cb != NULL);
237 
238   // Make sure we're not already registered
239   fixed_queue_unregister_dequeue(queue);
240 
241   queue->dequeue_ready = ready_cb;
242   queue->dequeue_context = context;
243   queue->dequeue_object =
244       reactor_register(reactor, fixed_queue_get_dequeue_fd(queue), queue,
245                        internal_dequeue_ready, NULL);
246 }
247 
fixed_queue_unregister_dequeue(fixed_queue_t * queue)248 void fixed_queue_unregister_dequeue(fixed_queue_t* queue) {
249   CHECK(queue != NULL);
250 
251   if (queue->dequeue_object) {
252     reactor_unregister(queue->dequeue_object);
253     queue->dequeue_object = NULL;
254   }
255 }
256 
internal_dequeue_ready(void * context)257 static void internal_dequeue_ready(void* context) {
258   CHECK(context != NULL);
259 
260   fixed_queue_t* queue = static_cast<fixed_queue_t*>(context);
261   queue->dequeue_ready(queue, queue->dequeue_context);
262 }
263