1 /*-------------------------------------------------------------------------
2  * drawElements Stream Library
3  * ---------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Thread safe ringbuffer
22  *//*--------------------------------------------------------------------*/
23 #include "deRingbuffer.h"
24 
25 #include "deInt32.h"
26 #include "deMemory.h"
27 #include "deSemaphore.h"
28 
29 #include <stdlib.h>
30 #include <stdio.h>
31 
32 struct deRingbuffer_s
33 {
34 	deInt32			blockSize;
35 	deInt32			blockCount;
36 	deInt32*		blockUsage;
37 	deUint8*		buffer;
38 
39 	deSemaphore		emptyCount;
40 	deSemaphore		fullCount;
41 
42 	deInt32			outBlock;
43 	deInt32			outPos;
44 
45 	deInt32			inBlock;
46 	deInt32			inPos;
47 
48 	deBool			stopNotified;
49 	deBool			consumerStopping;
50 };
51 
deRingbuffer_create(deInt32 blockSize,deInt32 blockCount)52 deRingbuffer* deRingbuffer_create (deInt32 blockSize, deInt32 blockCount)
53 {
54 	deRingbuffer* ringbuffer = (deRingbuffer*)deCalloc(sizeof(deRingbuffer));
55 
56 	DE_ASSERT(ringbuffer);
57 	DE_ASSERT(blockCount > 0);
58 	DE_ASSERT(blockSize > 0);
59 
60 	ringbuffer->blockSize	= blockSize;
61 	ringbuffer->blockCount	= blockCount;
62 	ringbuffer->buffer		= (deUint8*)deMalloc(sizeof(deUint8) * (size_t)blockSize * (size_t)blockCount);
63 	ringbuffer->blockUsage	= (deInt32*)deMalloc(sizeof(deUint32) * (size_t)blockCount);
64 	ringbuffer->emptyCount	= deSemaphore_create(ringbuffer->blockCount, DE_NULL);
65 	ringbuffer->fullCount	= deSemaphore_create(0, DE_NULL);
66 
67 	if (!ringbuffer->buffer		||
68 		!ringbuffer->blockUsage	||
69 		!ringbuffer->emptyCount	||
70 		!ringbuffer->fullCount)
71 	{
72 		if (ringbuffer->emptyCount)
73 			deSemaphore_destroy(ringbuffer->emptyCount);
74 		if (ringbuffer->fullCount)
75 			deSemaphore_destroy(ringbuffer->fullCount);
76 		deFree(ringbuffer->buffer);
77 		deFree(ringbuffer->blockUsage);
78 		deFree(ringbuffer);
79 		return DE_NULL;
80 	}
81 
82 	memset(ringbuffer->blockUsage, 0, sizeof(deInt32) * (size_t)blockCount);
83 
84 	ringbuffer->outBlock	= 0;
85 	ringbuffer->outPos		= 0;
86 
87 	ringbuffer->inBlock		= 0;
88 	ringbuffer->inPos		= 0;
89 
90 	ringbuffer->stopNotified		= DE_FALSE;
91 	ringbuffer->consumerStopping	= DE_FALSE;
92 
93 	return ringbuffer;
94 }
95 
deRingbuffer_stop(deRingbuffer * ringbuffer)96 void deRingbuffer_stop (deRingbuffer* ringbuffer)
97 {
98 	/* Set notify to true and increment fullCount to let consumer continue */
99 	ringbuffer->stopNotified = DE_TRUE;
100 	deSemaphore_increment(ringbuffer->fullCount);
101 }
102 
deRingbuffer_destroy(deRingbuffer * ringbuffer)103 void deRingbuffer_destroy (deRingbuffer* ringbuffer)
104 {
105 	deSemaphore_destroy(ringbuffer->emptyCount);
106 	deSemaphore_destroy(ringbuffer->fullCount);
107 
108 	free(ringbuffer->buffer);
109 	free(ringbuffer->blockUsage);
110 	free(ringbuffer);
111 }
112 
producerStream_write(deStreamData * stream,const void * buf,deInt32 bufSize,deInt32 * written)113 static deStreamResult producerStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* written)
114 {
115 	deRingbuffer* ringbuffer = (deRingbuffer*)stream;
116 
117 	DE_ASSERT(stream);
118 	/* If ringbuffer is stopping return error on write */
119 	if (ringbuffer->stopNotified)
120 	{
121 		DE_ASSERT(DE_FALSE);
122 		return DE_STREAMRESULT_ERROR;
123 	}
124 
125 	*written = 0;
126 
127 	/* Write while more data available */
128 	while (*written < bufSize)
129 	{
130 		deInt32		writeSize	= 0;
131 		deUint8*	src			= DE_NULL;
132 		deUint8*	dst			= DE_NULL;
133 
134 		/* If between blocks accuire new block */
135 		if (ringbuffer->inPos == 0)
136 		{
137 			deSemaphore_decrement(ringbuffer->emptyCount);
138 		}
139 
140 		writeSize	= deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written);
141 		dst			= ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos;
142 		src			= (deUint8*)buf + *written;
143 
144 		deMemcpy(dst, src, (size_t)writeSize);
145 
146 		ringbuffer->inPos += writeSize;
147 		*written += writeSize;
148 		ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize;
149 
150 		/* Block is full move to next one (or "between" this and next block) */
151 		if (ringbuffer->inPos == ringbuffer->blockSize)
152 		{
153 			ringbuffer->inPos = 0;
154 			ringbuffer->inBlock++;
155 
156 			if (ringbuffer->inBlock == ringbuffer->blockCount)
157 				ringbuffer->inBlock = 0;
158 			deSemaphore_increment(ringbuffer->fullCount);
159 		}
160 	}
161 
162 	return DE_STREAMRESULT_SUCCESS;
163 }
164 
producerStream_flush(deStreamData * stream)165 static deStreamResult producerStream_flush (deStreamData* stream)
166 {
167 	deRingbuffer* ringbuffer = (deRingbuffer*)stream;
168 
169 	DE_ASSERT(stream);
170 
171 	/* No blocks reserved by producer */
172 	if (ringbuffer->inPos == 0)
173 		return DE_STREAMRESULT_SUCCESS;
174 
175 	ringbuffer->inPos		= 0;
176 	ringbuffer->inBlock++;
177 
178 	if (ringbuffer->inBlock == ringbuffer->blockCount)
179 		ringbuffer->inBlock = 0;
180 
181 	deSemaphore_increment(ringbuffer->fullCount);
182 	return DE_STREAMRESULT_SUCCESS;
183 }
184 
producerStream_deinit(deStreamData * stream)185 static deStreamResult producerStream_deinit (deStreamData* stream)
186 {
187 	DE_ASSERT(stream);
188 
189 	producerStream_flush(stream);
190 
191 	/* \note mika Stream doesn't own ringbuffer, so it's not deallocated */
192 	return DE_STREAMRESULT_SUCCESS;
193 }
194 
consumerStream_read(deStreamData * stream,void * buf,deInt32 bufSize,deInt32 * read)195 static deStreamResult consumerStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* read)
196 {
197 	deRingbuffer* ringbuffer = (deRingbuffer*)stream;
198 
199 	DE_ASSERT(stream);
200 
201 	*read = 0;
202 	DE_ASSERT(ringbuffer);
203 
204 	while (*read < bufSize)
205 	{
206 		deInt32		writeSize	= 0;
207 		deUint8*	src			= DE_NULL;
208 		deUint8*	dst			= DE_NULL;
209 
210 		/* If between blocks accuire new block */
211 		if (ringbuffer->outPos == 0)
212 		{
213 			/* If consumer is set to stop after everything is consumed,
214 			 * do not block if there is no more input left
215 			 */
216 			if (ringbuffer->consumerStopping)
217 			{
218 				/* Try to accuire new block, if can't there is no more input */
219 				if (!deSemaphore_tryDecrement(ringbuffer->fullCount))
220 				{
221 					return DE_STREAMRESULT_END_OF_STREAM;
222 				}
223 			}
224 			else
225 			{
226 				/* If not stopping block until there is more input */
227 				deSemaphore_decrement(ringbuffer->fullCount);
228 				/* Ringbuffer was set to stop */
229 				if (ringbuffer->stopNotified)
230 				{
231 					ringbuffer->consumerStopping = DE_TRUE;
232 				}
233 			}
234 
235 		}
236 
237 		writeSize	= deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read);
238 		src			= ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos;
239 		dst			= (deUint8*)buf + *read;
240 
241 		deMemcpy(dst, src, (size_t)writeSize);
242 
243 		ringbuffer->outPos += writeSize;
244 		*read += writeSize;
245 
246 		/* Block is consumed move to next one (or "between" this and next block) */
247 		if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock])
248 		{
249 			ringbuffer->blockUsage[ringbuffer->outBlock] = 0;
250 			ringbuffer->outPos = 0;
251 			ringbuffer->outBlock++;
252 
253 			if (ringbuffer->outBlock == ringbuffer->blockCount)
254 				ringbuffer->outBlock = 0;
255 
256 			deSemaphore_increment(ringbuffer->emptyCount);
257 		}
258 	}
259 
260 	return DE_STREAMRESULT_SUCCESS;
261 }
262 
263 
consumerStream_deinit(deStreamData * stream)264 static deStreamResult consumerStream_deinit (deStreamData* stream)
265 {
266 	DE_ASSERT(stream);
267 	DE_UNREF(stream);
268 
269 	return DE_STREAMRESULT_SUCCESS;
270 }
271 
272 /* There are no sensible errors so status is always good */
dummy_getStatus(deStreamData * stream)273 deStreamStatus dummy_getStatus (deStreamData* stream)
274 {
275 	DE_UNREF(stream);
276 
277 	return DE_STREAMSTATUS_GOOD;
278 }
279 
280 /* There are no sensible errors in ringbuffer */
dummy_getError(deStreamData * stream)281 static const char* dummy_getError (deStreamData* stream)
282 {
283 	DE_ASSERT(stream);
284 	DE_UNREF(stream);
285 	return DE_NULL;
286 }
287 
288 static const deIOStreamVFTable producerStreamVFTable = {
289 	DE_NULL,
290 	producerStream_write,
291 	dummy_getError,
292 	producerStream_flush,
293 	producerStream_deinit,
294 	dummy_getStatus
295 };
296 
297 static const deIOStreamVFTable consumerStreamVFTable = {
298 	consumerStream_read,
299 	DE_NULL,
300 	dummy_getError,
301 	DE_NULL,
302 	consumerStream_deinit,
303 	dummy_getStatus
304 };
305 
deProducerStream_init(deOutStream * stream,deRingbuffer * buffer)306 void deProducerStream_init (deOutStream* stream, deRingbuffer* buffer)
307 {
308 	stream->ioStream.streamData = (deStreamData*)buffer;
309 	stream->ioStream.vfTable = &producerStreamVFTable;
310 }
311 
deConsumerStream_init(deInStream * stream,deRingbuffer * buffer)312 void deConsumerStream_init (deInStream* stream, deRingbuffer* buffer)
313 {
314 	stream->ioStream.streamData = (deStreamData*)buffer;
315 	stream->ioStream.vfTable = &consumerStreamVFTable;
316 }
317