1 #ifndef _DEBLOCKBUFFER_HPP
2 #define _DEBLOCKBUFFER_HPP
3 /*-------------------------------------------------------------------------
4  * drawElements C++ Base Library
5  * -----------------------------
6  *
7  * Copyright 2014 The Android Open Source Project
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  *//*!
22  * \file
23  * \brief Block-based thread-safe queue.
24  *//*--------------------------------------------------------------------*/
25 
26 #include "deBlockBuffer.hpp"
27 #include "deMutex.hpp"
28 #include "deSemaphore.h"
29 
30 #include <exception>
31 
32 namespace de
33 {
34 
35 void BlockBuffer_selfTest (void);
36 
37 class BufferCanceledException : public std::exception
38 {
39 public:
BufferCanceledException(void)40 	inline BufferCanceledException	(void) {}
~BufferCanceledException(void)41 	inline ~BufferCanceledException	(void) throw() {}
42 
what(void) const43 	const char* what (void) const throw() { return "BufferCanceledException"; }
44 };
45 
46 template <typename T>
47 class BlockBuffer
48 {
49 public:
50 	typedef BufferCanceledException CanceledException;
51 
52 					BlockBuffer			(int blockSize, int numBlocks);
53 					~BlockBuffer		(void);
54 
55 	void			clear				(void); //!< Resets buffer. Will block until pending writes and reads have completed.
56 
57 	void			write				(int numElements, const T* elements);
58 	int				tryWrite			(int numElements, const T* elements);
59 	void			flush				(void);
60 	bool			tryFlush			(void);
61 
62 	void			read				(int numElements, T* elements);
63 	int				tryRead				(int numElements, T* elements);
64 
65 	void			cancel				(void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException.
isCanceled(void) const66 	bool			isCanceled			(void) const { return !!m_canceled; }
67 
68 private:
69 					BlockBuffer			(const BlockBuffer& other);
70 	BlockBuffer&	operator=			(const BlockBuffer& other);
71 
72 	int				writeToCurrentBlock	(int numElements, const T* elements, bool blocking);
73 	int				readFromCurrentBlock(int numElements, T* elements, bool blocking);
74 
75 	void			flushWriteBlock		(void);
76 
77 	deSemaphore		m_fill;				//!< Block fill count.
78 	deSemaphore		m_empty;			//!< Block empty count.
79 
80 	int				m_writeBlock;		//!< Current write block ndx.
81 	int				m_writePos;			//!< Position in block. 0 if block is not yet acquired.
82 
83 	int				m_readBlock;		//!< Current read block ndx.
84 	int				m_readPos;			//!< Position in block. 0 if block is not yet acquired.
85 
86 	int				m_blockSize;
87 	int				m_numBlocks;
88 
89 	T*				m_elements;
90 	int*			m_numUsedInBlock;
91 
92 	Mutex			m_writeLock;
93 	Mutex			m_readLock;
94 
95 	volatile deUint32	m_canceled;
96 } DE_WARN_UNUSED_TYPE;
97 
98 template <typename T>
BlockBuffer(int blockSize,int numBlocks)99 BlockBuffer<T>::BlockBuffer (int blockSize, int numBlocks)
100 	: m_fill			(0)
101 	, m_empty			(0)
102 	, m_writeBlock		(0)
103 	, m_writePos		(0)
104 	, m_readBlock		(0)
105 	, m_readPos			(0)
106 	, m_blockSize		(blockSize)
107 	, m_numBlocks		(numBlocks)
108 	, m_elements		(DE_NULL)
109 	, m_numUsedInBlock	(DE_NULL)
110 	, m_writeLock		()
111 	, m_readLock		()
112 	, m_canceled		(DE_FALSE)
113 {
114 	DE_ASSERT(blockSize > 0);
115 	DE_ASSERT(numBlocks > 0);
116 
117 	try
118 	{
119 		m_elements			= new T[m_numBlocks*m_blockSize];
120 		m_numUsedInBlock	= new int[m_numBlocks];
121 	}
122 	catch (...)
123 	{
124 		delete[] m_elements;
125 		delete[] m_numUsedInBlock;
126 		throw;
127 	}
128 
129 	m_fill	= deSemaphore_create(0, DE_NULL);
130 	m_empty	= deSemaphore_create(numBlocks, DE_NULL);
131 	DE_ASSERT(m_fill && m_empty);
132 }
133 
134 template <typename T>
~BlockBuffer(void)135 BlockBuffer<T>::~BlockBuffer (void)
136 {
137 	delete[] m_elements;
138 	delete[] m_numUsedInBlock;
139 
140 	deSemaphore_destroy(m_fill);
141 	deSemaphore_destroy(m_empty);
142 }
143 
144 template <typename T>
clear(void)145 void BlockBuffer<T>::clear (void)
146 {
147 	ScopedLock readLock		(m_readLock);
148 	ScopedLock writeLock	(m_writeLock);
149 
150 	deSemaphore_destroy(m_fill);
151 	deSemaphore_destroy(m_empty);
152 
153 	m_fill			= deSemaphore_create(0, DE_NULL);
154 	m_empty			= deSemaphore_create(m_numBlocks, DE_NULL);
155 	m_writeBlock	= 0;
156 	m_writePos		= 0;
157 	m_readBlock		= 0;
158 	m_readPos		= 0;
159 	m_canceled		= DE_FALSE;
160 
161 	DE_ASSERT(m_fill && m_empty);
162 }
163 
164 template <typename T>
cancel(void)165 void BlockBuffer<T>::cancel (void)
166 {
167 	DE_ASSERT(!m_canceled);
168 	m_canceled = DE_TRUE;
169 
170 	deSemaphore_increment(m_empty);
171 	deSemaphore_increment(m_fill);
172 }
173 
174 template <typename T>
writeToCurrentBlock(int numElements,const T * elements,bool blocking)175 int BlockBuffer<T>::writeToCurrentBlock (int numElements, const T* elements, bool blocking)
176 {
177 	DE_ASSERT(numElements > 0 && elements != DE_NULL);
178 
179 	if (m_writePos == 0)
180 	{
181 		/* Write thread doesn't own current block - need to acquire. */
182 		if (blocking)
183 			deSemaphore_decrement(m_empty);
184 		else
185 		{
186 			if (!deSemaphore_tryDecrement(m_empty))
187 				return 0;
188 		}
189 
190 		/* Check for canceled bit. */
191 		if (m_canceled)
192 		{
193 			// \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here.
194 			deSemaphore_increment(m_empty);
195 			m_writeLock.unlock();
196 			throw CanceledException();
197 		}
198 	}
199 
200 	/* Write thread owns current block. */
201 	T*		block			= m_elements + m_writeBlock*m_blockSize;
202 	int		numToWrite		= de::min(numElements, m_blockSize-m_writePos);
203 
204 	DE_ASSERT(numToWrite > 0);
205 
206 	for (int ndx = 0; ndx < numToWrite; ndx++)
207 		block[m_writePos+ndx] = elements[ndx];
208 
209 	m_writePos += numToWrite;
210 
211 	if (m_writePos == m_blockSize)
212 		flushWriteBlock(); /* Flush current write block. */
213 
214 	return numToWrite;
215 }
216 
217 template <typename T>
readFromCurrentBlock(int numElements,T * elements,bool blocking)218 int BlockBuffer<T>::readFromCurrentBlock (int numElements, T* elements, bool blocking)
219 {
220 	DE_ASSERT(numElements > 0 && elements != DE_NULL);
221 
222 	if (m_readPos == 0)
223 	{
224 		/* Read thread doesn't own current block - need to acquire. */
225 		if (blocking)
226 			deSemaphore_decrement(m_fill);
227 		else
228 		{
229 			if (!deSemaphore_tryDecrement(m_fill))
230 				return 0;
231 		}
232 
233 		/* Check for canceled bit. */
234 		if (m_canceled)
235 		{
236 			// \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here.
237 			deSemaphore_increment(m_fill);
238 			m_readLock.unlock();
239 			throw CanceledException();
240 		}
241 	}
242 
243 	/* Read thread now owns current block. */
244 	const T*	block			= m_elements + m_readBlock*m_blockSize;
245 	int			numUsedInBlock	= m_numUsedInBlock[m_readBlock];
246 	int			numToRead		= de::min(numElements, numUsedInBlock-m_readPos);
247 
248 	DE_ASSERT(numToRead > 0);
249 
250 	for (int ndx = 0; ndx < numToRead; ndx++)
251 		elements[ndx] = block[m_readPos+ndx];
252 
253 	m_readPos += numToRead;
254 
255 	if (m_readPos == numUsedInBlock)
256 	{
257 		/* Free current read block and advance. */
258 		m_readBlock		= (m_readBlock+1) % m_numBlocks;
259 		m_readPos		= 0;
260 		deSemaphore_increment(m_empty);
261 	}
262 
263 	return numToRead;
264 }
265 
266 template <typename T>
tryWrite(int numElements,const T * elements)267 int BlockBuffer<T>::tryWrite (int numElements, const T* elements)
268 {
269 	int numWritten = 0;
270 
271 	DE_ASSERT(numElements > 0 && elements != DE_NULL);
272 
273 	if (m_canceled)
274 		throw CanceledException();
275 
276 	if (!m_writeLock.tryLock())
277 		return numWritten;
278 
279 	while (numWritten < numElements)
280 	{
281 		int ret = writeToCurrentBlock(numElements-numWritten, elements+numWritten, false /* non-blocking */);
282 
283 		if (ret == 0)
284 			break; /* Write failed. */
285 
286 		numWritten += ret;
287 	}
288 
289 	m_writeLock.unlock();
290 
291 	return numWritten;
292 }
293 
294 template <typename T>
write(int numElements,const T * elements)295 void BlockBuffer<T>::write (int numElements, const T* elements)
296 {
297 	DE_ASSERT(numElements > 0 && elements != DE_NULL);
298 
299 	if (m_canceled)
300 		throw CanceledException();
301 
302 	m_writeLock.lock();
303 
304 	int numWritten = 0;
305 	while (numWritten < numElements)
306 		numWritten += writeToCurrentBlock(numElements-numWritten, elements+numWritten, true /* blocking */);
307 
308 	m_writeLock.unlock();
309 }
310 
311 template <typename T>
flush(void)312 void BlockBuffer<T>::flush (void)
313 {
314 	m_writeLock.lock();
315 
316 	if (m_writePos > 0)
317 		flushWriteBlock();
318 
319 	m_writeLock.unlock();
320 }
321 
322 template <typename T>
tryFlush(void)323 bool BlockBuffer<T>::tryFlush (void)
324 {
325 	if (!m_writeLock.tryLock())
326 		return false;
327 
328 	if (m_writePos > 0)
329 		flushWriteBlock();
330 
331 	m_writeLock.unlock();
332 
333 	return true;
334 }
335 
336 template <typename T>
flushWriteBlock(void)337 void BlockBuffer<T>::flushWriteBlock (void)
338 {
339 	DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize));
340 
341 	m_numUsedInBlock[m_writeBlock]	= m_writePos;
342 	m_writeBlock					= (m_writeBlock+1) % m_numBlocks;
343 	m_writePos						= 0;
344 	deSemaphore_increment(m_fill);
345 }
346 
347 template <typename T>
tryRead(int numElements,T * elements)348 int BlockBuffer<T>::tryRead (int numElements, T* elements)
349 {
350 	int numRead = 0;
351 
352 	if (m_canceled)
353 		throw CanceledException();
354 
355 	if (!m_readLock.tryLock())
356 		return numRead;
357 
358 	while (numRead < numElements)
359 	{
360 		int ret = readFromCurrentBlock(numElements-numRead, &elements[numRead], false /* non-blocking */);
361 
362 		if (ret == 0)
363 			break; /* Failed. */
364 
365 		numRead += ret;
366 	}
367 
368 	m_readLock.unlock();
369 
370 	return numRead;
371 }
372 
373 template <typename T>
read(int numElements,T * elements)374 void BlockBuffer<T>::read (int numElements, T* elements)
375 {
376 	DE_ASSERT(numElements > 0 && elements != DE_NULL);
377 
378 	if (m_canceled)
379 		throw CanceledException();
380 
381 	m_readLock.lock();
382 
383 	int numRead = 0;
384 	while (numRead < numElements)
385 		numRead += readFromCurrentBlock(numElements-numRead, &elements[numRead], true /* blocking */);
386 
387 	m_readLock.unlock();
388 }
389 
390 } // de
391 
392 #endif // _DEBLOCKBUFFER_HPP
393