1 /*-------------------------------------------------------------------------
2  * drawElements C++ Base Library
3  * -----------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Block-based thread-safe queue.
22  *//*--------------------------------------------------------------------*/
23 
24 #include "deBlockBuffer.hpp"
25 #include "deRandom.hpp"
26 #include "deThread.hpp"
27 #include "deInt32.h"
28 
29 #include <vector>
30 
31 namespace de
32 {
33 
34 using std::vector;
35 
36 namespace BlockBufferBasicTest
37 {
38 
39 struct Message
40 {
41 	deUint32 data;
42 
Messagede::BlockBufferBasicTest::Message43 	Message (deUint16 threadId, deUint16 payload)
44 		: data((threadId << 16) | payload)
45 	{
46 	}
47 
Messagede::BlockBufferBasicTest::Message48 	Message (void)
49 		: data(0)
50 	{
51 	}
52 
getThreadIdde::BlockBufferBasicTest::Message53 	deUint16 getThreadId	(void) const { return data >> 16;		}
getPayloadde::BlockBufferBasicTest::Message54 	deUint16 getPayload		(void) const { return data & 0xffff;	}
55 };
56 
57 typedef BlockBuffer<Message> MessageBuffer;
58 
59 class Consumer : public Thread
60 {
61 public:
Consumer(MessageBuffer & buffer,int numProducers)62 	Consumer (MessageBuffer& buffer, int numProducers)
63 		: m_buffer		(buffer)
64 	{
65 		m_lastPayload.resize(numProducers, 0);
66 		m_payloadSum.resize(numProducers, 0);
67 	}
68 
run(void)69 	void run (void)
70 	{
71 		Random	rnd		((deUint32)m_lastPayload.size());
72 		Message	tmpBuf	[64];
73 		bool	consume	= true;
74 
75 		while (consume)
76 		{
77 			int numToRead	= rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmpBuf));
78 			int numRead		= m_buffer.tryRead(numToRead, &tmpBuf[0]);
79 
80 			for (int ndx = 0; ndx < numRead; ndx++)
81 			{
82 				const Message& msg = tmpBuf[ndx];
83 
84 				deUint16 threadId = msg.getThreadId();
85 
86 				if (threadId == 0xffff)
87 				{
88 					/* Feed back rest of messages to buffer (they are end messages) so other consumers wake up. */
89 					if (ndx+1 < numRead)
90 					{
91 						m_buffer.write(numRead-ndx-1, &tmpBuf[ndx+1]);
92 						m_buffer.flush();
93 					}
94 
95 					consume = false;
96 					break;
97 				}
98 				else
99 				{
100 					/* Verify message. */
101 					DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
102 					DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload());
103 
104 					m_lastPayload[threadId]	 = msg.getPayload();
105 					m_payloadSum[threadId]	+= (deUint32)msg.getPayload();
106 				}
107 			}
108 		}
109 	}
110 
getPayloadSum(deUint16 threadId) const111 	deUint32 getPayloadSum (deUint16 threadId) const
112 	{
113 		return m_payloadSum[threadId];
114 	}
115 
116 private:
117 	MessageBuffer&			m_buffer;
118 	vector<deUint16>		m_lastPayload;
119 	vector<deUint32>		m_payloadSum;
120 };
121 
122 class Producer : public Thread
123 {
124 public:
Producer(MessageBuffer & buffer,deUint16 threadId,int numMessages)125 	Producer (MessageBuffer& buffer, deUint16 threadId, int numMessages)
126 		: m_buffer		(buffer)
127 		, m_threadId	(threadId)
128 		, m_numMessages	(numMessages)
129 	{
130 	}
131 
run(void)132 	void run (void)
133 	{
134 		// Yield to give main thread chance to start other producers.
135 		deSleep(1);
136 
137 		Random	rnd		(m_threadId);
138 		int		msgNdx	= 0;
139 		Message	tmpBuf[64];
140 
141 		while (msgNdx < m_numMessages)
142 		{
143 			int writeSize = rnd.getInt(1, de::min(m_numMessages-msgNdx, DE_LENGTH_OF_ARRAY(tmpBuf)));
144 			for (int ndx = 0; ndx < writeSize; ndx++)
145 				tmpBuf[ndx] = Message(m_threadId, (deUint16)msgNdx++);
146 
147 			m_buffer.write(writeSize, &tmpBuf[0]);
148 			if (rnd.getBool())
149 				m_buffer.flush();
150 		}
151 	}
152 
153 private:
154 	MessageBuffer&	m_buffer;
155 	deUint16		m_threadId;
156 	int				m_numMessages;
157 };
158 
runTest(void)159 void runTest (void)
160 {
161 	const int numIterations = 8;
162 	for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
163 	{
164 		Random							rnd				(iterNdx);
165 		int								numBlocks		= rnd.getInt(2, 128);
166 		int								blockSize		= rnd.getInt(1, 16);
167 		int								numProducers	= rnd.getInt(1, 16);
168 		int								numConsumers	= rnd.getInt(1, 16);
169 		int								dataSize		= rnd.getInt(50, 200);
170 		MessageBuffer					buffer			(blockSize, numBlocks);
171 		vector<Producer*>				producers;
172 		vector<Consumer*>				consumers;
173 
174 		for (int i = 0; i < numProducers; i++)
175 			producers.push_back(new Producer(buffer, (deUint16)i, dataSize));
176 
177 		for (int i = 0; i < numConsumers; i++)
178 			consumers.push_back(new Consumer(buffer, numProducers));
179 
180 		// Start consumers.
181 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
182 			(*i)->start();
183 
184 		// Start producers.
185 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
186 			(*i)->start();
187 
188 		// Wait for producers.
189 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
190 			(*i)->join();
191 
192 		// Write end messages for consumers.
193 		const Message endMsg(0xffff, 0);
194 		for (int i = 0; i < numConsumers; i++)
195 			buffer.write(1, &endMsg);
196 		buffer.flush();
197 
198 		// Wait for consumers.
199 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
200 			(*i)->join();
201 
202 		// Verify payload sums.
203 		deUint32 refSum = 0;
204 		for (int i = 0; i < dataSize; i++)
205 			refSum += (deUint32)(deUint16)i;
206 
207 		for (int i = 0; i < numProducers; i++)
208 		{
209 			deUint32 cmpSum = 0;
210 			for (int j = 0; j < numConsumers; j++)
211 				cmpSum += consumers[j]->getPayloadSum(i);
212 			DE_TEST_ASSERT(refSum == cmpSum);
213 		}
214 
215 		// Free resources.
216 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
217 			delete *i;
218 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
219 			delete *i;
220 	}
221 }
222 
223 } // BlockBufferBasicTest
224 
225 namespace BlockBufferCancelTest
226 {
227 
228 class Producer : public Thread
229 {
230 public:
Producer(BlockBuffer<deUint8> * buffer,deUint32 seed)231 	Producer (BlockBuffer<deUint8>* buffer, deUint32 seed)
232 		: m_buffer	(buffer)
233 		, m_seed	(seed)
234 	{
235 	}
236 
run(void)237 	void run (void)
238 	{
239 		deUint8	tmp[1024];
240 		Random	rnd(m_seed);
241 
242 		for (;;)
243 		{
244 			int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
245 
246 			try
247 			{
248 				m_buffer->write(blockSize, &tmp[0]);
249 
250 				if (rnd.getBool())
251 					m_buffer->flush();
252 			}
253 			catch (const BlockBuffer<deUint8>::CanceledException&)
254 			{
255 				break;
256 			}
257 		}
258 	}
259 
260 private:
261 	BlockBuffer<deUint8>*	m_buffer;
262 	deUint32				m_seed;
263 };
264 
265 class Consumer : public Thread
266 {
267 public:
Consumer(BlockBuffer<deUint8> * buffer,deUint32 seed)268 	Consumer (BlockBuffer<deUint8>* buffer, deUint32 seed)
269 		: m_buffer	(buffer)
270 		, m_seed	(seed)
271 	{
272 	}
273 
run(void)274 	void run (void)
275 	{
276 		deUint8	tmp[1024];
277 		Random	rnd(m_seed);
278 
279 		for (;;)
280 		{
281 			int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
282 
283 			try
284 			{
285 				m_buffer->read(blockSize, &tmp[0]);
286 			}
287 			catch (const BlockBuffer<deUint8>::CanceledException&)
288 			{
289 				break;
290 			}
291 		}
292 	}
293 
294 private:
295 	BlockBuffer<deUint8>*	m_buffer;
296 	deUint32				m_seed;
297 };
298 
runTest(void)299 void runTest (void)
300 {
301 	BlockBuffer<deUint8>	buffer			(64, 16);
302 	const int				numIterations	= 8;
303 
304 	for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
305 	{
306 		Random				rnd				(deInt32Hash(iterNdx));
307 		int					numThreads		= rnd.getInt(1, 16);
308 		int					sleepMs			= rnd.getInt(1, 200);
309 		vector<Thread*>		threads;
310 
311 		for (int i = 0; i < numThreads; i++)
312 		{
313 			if (rnd.getBool())
314 				threads.push_back(new Consumer(&buffer, rnd.getUint32()));
315 			else
316 				threads.push_back(new Producer(&buffer, rnd.getUint32()));
317 		}
318 
319 		// Start threads.
320 		for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
321 			(*i)->start();
322 
323 		// Sleep for a while.
324 		deSleep(sleepMs);
325 
326 		// Cancel buffer.
327 		buffer.cancel();
328 
329 		// Wait for threads to finish.
330 		for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
331 			(*i)->join();
332 
333 		// Reset buffer.
334 		buffer.clear();
335 
336 		// Delete threads
337 		for (vector<Thread*>::iterator thread = threads.begin(); thread != threads.end(); ++thread)
338 			delete *thread;
339 	}
340 }
341 
342 } // BlockBufferCancelTest
343 
BlockBuffer_selfTest(void)344 void BlockBuffer_selfTest (void)
345 {
346 	BlockBufferBasicTest::runTest();
347 	BlockBufferCancelTest::runTest();
348 }
349 
350 } // de
351