1 /*-------------------------------------------------------------------------
2  * drawElements C++ Base Library
3  * -----------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Thread-safe ring buffer template.
22  *//*--------------------------------------------------------------------*/
23 
24 #include "deThreadSafeRingBuffer.hpp"
25 #include "deRandom.hpp"
26 #include "deThread.hpp"
27 
28 #include <vector>
29 
30 using std::vector;
31 
32 namespace de
33 {
34 
35 namespace
36 {
37 
38 struct Message
39 {
40 	deUint32 data;
41 
Messagede::__anondb4898ff0111::Message42 	Message (deUint16 threadId, deUint16 payload)
43 		: data((threadId << 16) | payload)
44 	{
45 	}
46 
Messagede::__anondb4898ff0111::Message47 	Message (void)
48 		: data(0)
49 	{
50 	}
51 
getThreadIdde::__anondb4898ff0111::Message52 	deUint16 getThreadId	(void) const { return data >> 16;		}
getPayloadde::__anondb4898ff0111::Message53 	deUint16 getPayload		(void) const { return data & 0xffff;	}
54 };
55 
56 class Consumer : public Thread
57 {
58 public:
Consumer(ThreadSafeRingBuffer<Message> & buffer,int numProducers)59 	Consumer (ThreadSafeRingBuffer<Message>& buffer, int numProducers)
60 		: m_buffer		(buffer)
61 	{
62 		m_lastPayload.resize(numProducers, 0);
63 		m_payloadSum.resize(numProducers, 0);
64 	}
65 
run(void)66 	void run (void)
67 	{
68 		for (;;)
69 		{
70 			Message msg = m_buffer.popBack();
71 
72 			deUint16 threadId = msg.getThreadId();
73 
74 			if (threadId == 0xffff)
75 				break;
76 
77 			DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
78 			DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload());
79 
80 			m_lastPayload[threadId]	 = msg.getPayload();
81 			m_payloadSum[threadId]	+= (deUint32)msg.getPayload();
82 		}
83 	}
84 
getPayloadSum(deUint16 threadId) const85 	deUint32 getPayloadSum (deUint16 threadId) const
86 	{
87 		return m_payloadSum[threadId];
88 	}
89 
90 private:
91 	ThreadSafeRingBuffer<Message>&	m_buffer;
92 	vector<deUint16>				m_lastPayload;
93 	vector<deUint32>				m_payloadSum;
94 };
95 
96 class Producer : public Thread
97 {
98 public:
Producer(ThreadSafeRingBuffer<Message> & buffer,deUint16 threadId,int dataSize)99 	Producer (ThreadSafeRingBuffer<Message>& buffer, deUint16 threadId, int dataSize)
100 		: m_buffer		(buffer)
101 		, m_threadId	(threadId)
102 		, m_dataSize	(dataSize)
103 	{
104 	}
105 
run(void)106 	void run (void)
107 	{
108 		// Yield to give main thread chance to start other producers.
109 		deSleep(1);
110 
111 		for (int ndx = 0; ndx < m_dataSize; ndx++)
112 			m_buffer.pushFront(Message(m_threadId, (deUint16)ndx));
113 	}
114 
115 private:
116 	ThreadSafeRingBuffer<Message>&	m_buffer;
117 	deUint16						m_threadId;
118 	int								m_dataSize;
119 };
120 
121 } // anonymous
122 
ThreadSafeRingBuffer_selfTest(void)123 void ThreadSafeRingBuffer_selfTest (void)
124 {
125 	const int numIterations = 16;
126 	for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
127 	{
128 		Random							rnd				(iterNdx);
129 		int								bufSize			= rnd.getInt(1, 2048);
130 		int								numProducers	= rnd.getInt(1, 16);
131 		int								numConsumers	= rnd.getInt(1, 16);
132 		int								dataSize		= rnd.getInt(1000, 10000);
133 		ThreadSafeRingBuffer<Message>	buffer			(bufSize);
134 		vector<Producer*>				producers;
135 		vector<Consumer*>				consumers;
136 
137 		for (int i = 0; i < numProducers; i++)
138 			producers.push_back(new Producer(buffer, (deUint16)i, dataSize));
139 
140 		for (int i = 0; i < numConsumers; i++)
141 			consumers.push_back(new Consumer(buffer, numProducers));
142 
143 		// Start consumers.
144 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
145 			(*i)->start();
146 
147 		// Start producers.
148 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
149 			(*i)->start();
150 
151 		// Wait for producers.
152 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
153 			(*i)->join();
154 
155 		// Write end messages for consumers.
156 		for (int i = 0; i < numConsumers; i++)
157 			buffer.pushFront(Message(0xffff, 0));
158 
159 		// Wait for consumers.
160 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
161 			(*i)->join();
162 
163 		// Verify payload sums.
164 		deUint32 refSum = 0;
165 		for (int i = 0; i < dataSize; i++)
166 			refSum += (deUint32)(deUint16)i;
167 
168 		for (int i = 0; i < numProducers; i++)
169 		{
170 			deUint32 cmpSum = 0;
171 			for (int j = 0; j < numConsumers; j++)
172 				cmpSum += consumers[j]->getPayloadSum(i);
173 			DE_TEST_ASSERT(refSum == cmpSum);
174 		}
175 
176 		// Free resources.
177 		for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
178 			delete *i;
179 		for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
180 			delete *i;
181 	}
182 }
183 
184 } // de
185