1 /*-------------------------------------------------------------------------
2 * drawElements C++ Base Library
3 * -----------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Block-based thread-safe queue.
22 *//*--------------------------------------------------------------------*/
23
24 #include "deBlockBuffer.hpp"
25 #include "deRandom.hpp"
26 #include "deThread.hpp"
27 #include "deInt32.h"
28
29 #include <vector>
30
31 namespace de
32 {
33
34 using std::vector;
35
36 namespace BlockBufferBasicTest
37 {
38
39 struct Message
40 {
41 deUint32 data;
42
Messagede::BlockBufferBasicTest::Message43 Message (deUint16 threadId, deUint16 payload)
44 : data((threadId << 16) | payload)
45 {
46 }
47
Messagede::BlockBufferBasicTest::Message48 Message (void)
49 : data(0)
50 {
51 }
52
getThreadIdde::BlockBufferBasicTest::Message53 deUint16 getThreadId (void) const { return data >> 16; }
getPayloadde::BlockBufferBasicTest::Message54 deUint16 getPayload (void) const { return data & 0xffff; }
55 };
56
57 typedef BlockBuffer<Message> MessageBuffer;
58
59 class Consumer : public Thread
60 {
61 public:
Consumer(MessageBuffer & buffer,int numProducers)62 Consumer (MessageBuffer& buffer, int numProducers)
63 : m_buffer (buffer)
64 {
65 m_lastPayload.resize(numProducers, 0);
66 m_payloadSum.resize(numProducers, 0);
67 }
68
run(void)69 void run (void)
70 {
71 Random rnd ((deUint32)m_lastPayload.size());
72 Message tmpBuf [64];
73 bool consume = true;
74
75 while (consume)
76 {
77 int numToRead = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmpBuf));
78 int numRead = m_buffer.tryRead(numToRead, &tmpBuf[0]);
79
80 for (int ndx = 0; ndx < numRead; ndx++)
81 {
82 const Message& msg = tmpBuf[ndx];
83
84 deUint16 threadId = msg.getThreadId();
85
86 if (threadId == 0xffff)
87 {
88 /* Feed back rest of messages to buffer (they are end messages) so other consumers wake up. */
89 if (ndx+1 < numRead)
90 {
91 m_buffer.write(numRead-ndx-1, &tmpBuf[ndx+1]);
92 m_buffer.flush();
93 }
94
95 consume = false;
96 break;
97 }
98 else
99 {
100 /* Verify message. */
101 DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
102 DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload());
103
104 m_lastPayload[threadId] = msg.getPayload();
105 m_payloadSum[threadId] += (deUint32)msg.getPayload();
106 }
107 }
108 }
109 }
110
getPayloadSum(deUint16 threadId) const111 deUint32 getPayloadSum (deUint16 threadId) const
112 {
113 return m_payloadSum[threadId];
114 }
115
116 private:
117 MessageBuffer& m_buffer;
118 vector<deUint16> m_lastPayload;
119 vector<deUint32> m_payloadSum;
120 };
121
122 class Producer : public Thread
123 {
124 public:
Producer(MessageBuffer & buffer,deUint16 threadId,int numMessages)125 Producer (MessageBuffer& buffer, deUint16 threadId, int numMessages)
126 : m_buffer (buffer)
127 , m_threadId (threadId)
128 , m_numMessages (numMessages)
129 {
130 }
131
run(void)132 void run (void)
133 {
134 // Yield to give main thread chance to start other producers.
135 deSleep(1);
136
137 Random rnd (m_threadId);
138 int msgNdx = 0;
139 Message tmpBuf[64];
140
141 while (msgNdx < m_numMessages)
142 {
143 int writeSize = rnd.getInt(1, de::min(m_numMessages-msgNdx, DE_LENGTH_OF_ARRAY(tmpBuf)));
144 for (int ndx = 0; ndx < writeSize; ndx++)
145 tmpBuf[ndx] = Message(m_threadId, (deUint16)msgNdx++);
146
147 m_buffer.write(writeSize, &tmpBuf[0]);
148 if (rnd.getBool())
149 m_buffer.flush();
150 }
151 }
152
153 private:
154 MessageBuffer& m_buffer;
155 deUint16 m_threadId;
156 int m_numMessages;
157 };
158
runTest(void)159 void runTest (void)
160 {
161 const int numIterations = 8;
162 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
163 {
164 Random rnd (iterNdx);
165 int numBlocks = rnd.getInt(2, 128);
166 int blockSize = rnd.getInt(1, 16);
167 int numProducers = rnd.getInt(1, 16);
168 int numConsumers = rnd.getInt(1, 16);
169 int dataSize = rnd.getInt(50, 200);
170 MessageBuffer buffer (blockSize, numBlocks);
171 vector<Producer*> producers;
172 vector<Consumer*> consumers;
173
174 for (int i = 0; i < numProducers; i++)
175 producers.push_back(new Producer(buffer, (deUint16)i, dataSize));
176
177 for (int i = 0; i < numConsumers; i++)
178 consumers.push_back(new Consumer(buffer, numProducers));
179
180 // Start consumers.
181 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
182 (*i)->start();
183
184 // Start producers.
185 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
186 (*i)->start();
187
188 // Wait for producers.
189 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
190 (*i)->join();
191
192 // Write end messages for consumers.
193 const Message endMsg(0xffff, 0);
194 for (int i = 0; i < numConsumers; i++)
195 buffer.write(1, &endMsg);
196 buffer.flush();
197
198 // Wait for consumers.
199 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
200 (*i)->join();
201
202 // Verify payload sums.
203 deUint32 refSum = 0;
204 for (int i = 0; i < dataSize; i++)
205 refSum += (deUint32)(deUint16)i;
206
207 for (int i = 0; i < numProducers; i++)
208 {
209 deUint32 cmpSum = 0;
210 for (int j = 0; j < numConsumers; j++)
211 cmpSum += consumers[j]->getPayloadSum(i);
212 DE_TEST_ASSERT(refSum == cmpSum);
213 }
214
215 // Free resources.
216 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
217 delete *i;
218 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
219 delete *i;
220 }
221 }
222
223 } // BlockBufferBasicTest
224
225 namespace BlockBufferCancelTest
226 {
227
228 class Producer : public Thread
229 {
230 public:
Producer(BlockBuffer<deUint8> * buffer,deUint32 seed)231 Producer (BlockBuffer<deUint8>* buffer, deUint32 seed)
232 : m_buffer (buffer)
233 , m_seed (seed)
234 {
235 }
236
run(void)237 void run (void)
238 {
239 deUint8 tmp[1024];
240 Random rnd(m_seed);
241
242 for (;;)
243 {
244 int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
245
246 try
247 {
248 m_buffer->write(blockSize, &tmp[0]);
249
250 if (rnd.getBool())
251 m_buffer->flush();
252 }
253 catch (const BlockBuffer<deUint8>::CanceledException&)
254 {
255 break;
256 }
257 }
258 }
259
260 private:
261 BlockBuffer<deUint8>* m_buffer;
262 deUint32 m_seed;
263 };
264
265 class Consumer : public Thread
266 {
267 public:
Consumer(BlockBuffer<deUint8> * buffer,deUint32 seed)268 Consumer (BlockBuffer<deUint8>* buffer, deUint32 seed)
269 : m_buffer (buffer)
270 , m_seed (seed)
271 {
272 }
273
run(void)274 void run (void)
275 {
276 deUint8 tmp[1024];
277 Random rnd(m_seed);
278
279 for (;;)
280 {
281 int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
282
283 try
284 {
285 m_buffer->read(blockSize, &tmp[0]);
286 }
287 catch (const BlockBuffer<deUint8>::CanceledException&)
288 {
289 break;
290 }
291 }
292 }
293
294 private:
295 BlockBuffer<deUint8>* m_buffer;
296 deUint32 m_seed;
297 };
298
runTest(void)299 void runTest (void)
300 {
301 BlockBuffer<deUint8> buffer (64, 16);
302 const int numIterations = 8;
303
304 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
305 {
306 Random rnd (deInt32Hash(iterNdx));
307 int numThreads = rnd.getInt(1, 16);
308 int sleepMs = rnd.getInt(1, 200);
309 vector<Thread*> threads;
310
311 for (int i = 0; i < numThreads; i++)
312 {
313 if (rnd.getBool())
314 threads.push_back(new Consumer(&buffer, rnd.getUint32()));
315 else
316 threads.push_back(new Producer(&buffer, rnd.getUint32()));
317 }
318
319 // Start threads.
320 for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
321 (*i)->start();
322
323 // Sleep for a while.
324 deSleep(sleepMs);
325
326 // Cancel buffer.
327 buffer.cancel();
328
329 // Wait for threads to finish.
330 for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
331 (*i)->join();
332
333 // Reset buffer.
334 buffer.clear();
335
336 // Delete threads
337 for (vector<Thread*>::iterator thread = threads.begin(); thread != threads.end(); ++thread)
338 delete *thread;
339 }
340 }
341
342 } // BlockBufferCancelTest
343
BlockBuffer_selfTest(void)344 void BlockBuffer_selfTest (void)
345 {
346 BlockBufferBasicTest::runTest();
347 BlockBufferCancelTest::runTest();
348 }
349
350 } // de
351