1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 /* ThreadPool */
18
19 #include "sles_allinclusive.h"
20
21 // Entry point for each worker thread
22
ThreadPool_start(void * context)23 static void *ThreadPool_start(void *context)
24 {
25 ThreadPool *tp = (ThreadPool *) context;
26 assert(NULL != tp);
27 for (;;) {
28 Closure *pClosure = ThreadPool_remove(tp);
29 // closure is NULL when thread pool is being destroyed
30 if (NULL == pClosure) {
31 break;
32 }
33 // make a copy of parameters, then free the parameters
34 const Closure closure = *pClosure;
35 free(pClosure);
36 // extract parameters and call the right method depending on kind
37 ClosureKind kind = closure.mKind;
38 void *context1 = closure.mContext1;
39 void *context2 = closure.mContext2;
40 int parameter1 = closure.mParameter1;
41 switch (kind) {
42 case CLOSURE_KIND_PPI:
43 {
44 ClosureHandler_ppi handler_ppi = closure.mHandler.mHandler_ppi;
45 assert(NULL != handler_ppi);
46 (*handler_ppi)(context1, context2, parameter1);
47 }
48 break;
49 case CLOSURE_KIND_PPII:
50 {
51 ClosureHandler_ppii handler_ppii = closure.mHandler.mHandler_ppii;
52 assert(NULL != handler_ppii);
53 int parameter2 = closure.mParameter2;
54 (*handler_ppii)(context1, context2, parameter1, parameter2);
55 }
56 break;
57 case CLOSURE_KIND_PIIPP:
58 {
59 ClosureHandler_piipp handler_piipp = closure.mHandler.mHandler_piipp;
60 assert(NULL != handler_piipp);
61 int parameter2 = closure.mParameter2;
62 void *context3 = closure.mContext3;
63 (*handler_piipp)(context1, parameter1, parameter2, context2, context3);
64 }
65 break;
66 default:
67 SL_LOGE("Unexpected callback kind %d", kind);
68 assert(false);
69 break;
70 }
71 }
72 return NULL;
73 }
74
75 #define INITIALIZED_NONE 0
76 #define INITIALIZED_MUTEX 1
77 #define INITIALIZED_CONDNOTFULL 2
78 #define INITIALIZED_CONDNOTEMPTY 4
79 #define INITIALIZED_ALL 7
80
81 static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads);
82
83 // Initialize a ThreadPool
84 // maxClosures defaults to CLOSURE_TYPICAL if 0
85 // maxThreads defaults to THREAD_TYPICAL if 0
86
ThreadPool_init(ThreadPool * tp,unsigned maxClosures,unsigned maxThreads)87 SLresult ThreadPool_init(ThreadPool *tp, unsigned maxClosures, unsigned maxThreads)
88 {
89 assert(NULL != tp);
90 memset(tp, 0, sizeof(ThreadPool));
91 tp->mShutdown = SL_BOOLEAN_FALSE;
92 unsigned initialized = INITIALIZED_NONE; // which objects were successfully initialized
93 unsigned nThreads = 0; // number of threads successfully created
94 int err;
95 SLresult result;
96
97 // initialize mutex and condition variables
98 err = pthread_mutex_init(&tp->mMutex, (const pthread_mutexattr_t *) NULL);
99 result = err_to_result(err);
100 if (SL_RESULT_SUCCESS != result)
101 goto fail;
102 initialized |= INITIALIZED_MUTEX;
103 err = pthread_cond_init(&tp->mCondNotFull, (const pthread_condattr_t *) NULL);
104 result = err_to_result(err);
105 if (SL_RESULT_SUCCESS != result)
106 goto fail;
107 initialized |= INITIALIZED_CONDNOTFULL;
108 err = pthread_cond_init(&tp->mCondNotEmpty, (const pthread_condattr_t *) NULL);
109 result = err_to_result(err);
110 if (SL_RESULT_SUCCESS != result)
111 goto fail;
112 initialized |= INITIALIZED_CONDNOTEMPTY;
113
114 // use default values for parameters, if not specified explicitly
115 tp->mWaitingNotFull = 0;
116 tp->mWaitingNotEmpty = 0;
117 if (0 == maxClosures)
118 maxClosures = CLOSURE_TYPICAL;
119 tp->mMaxClosures = maxClosures;
120 if (0 == maxThreads)
121 maxThreads = THREAD_TYPICAL;
122 tp->mMaxThreads = maxThreads;
123
124 // initialize circular buffer for closures
125 if (CLOSURE_TYPICAL >= maxClosures) {
126 tp->mClosureArray = tp->mClosureTypical;
127 } else {
128 tp->mClosureArray = (Closure **) malloc((maxClosures + 1) * sizeof(Closure *));
129 if (NULL == tp->mClosureArray) {
130 result = SL_RESULT_RESOURCE_ERROR;
131 goto fail;
132 }
133 }
134 tp->mClosureFront = tp->mClosureArray;
135 tp->mClosureRear = tp->mClosureArray;
136
137 // initialize thread pool
138 if (THREAD_TYPICAL >= maxThreads) {
139 tp->mThreadArray = tp->mThreadTypical;
140 } else {
141 tp->mThreadArray = (pthread_t *) malloc(maxThreads * sizeof(pthread_t));
142 if (NULL == tp->mThreadArray) {
143 result = SL_RESULT_RESOURCE_ERROR;
144 goto fail;
145 }
146 }
147 unsigned i;
148 for (i = 0; i < maxThreads; ++i) {
149 int err = pthread_create(&tp->mThreadArray[i], (const pthread_attr_t *) NULL,
150 ThreadPool_start, tp);
151 result = err_to_result(err);
152 if (SL_RESULT_SUCCESS != result)
153 goto fail;
154 ++nThreads;
155 }
156 tp->mInitialized = initialized;
157
158 // done
159 return SL_RESULT_SUCCESS;
160
161 // here on any kind of error
162 fail:
163 ThreadPool_deinit_internal(tp, initialized, nThreads);
164 return result;
165 }
166
ThreadPool_deinit_internal(ThreadPool * tp,unsigned initialized,unsigned nThreads)167 static void ThreadPool_deinit_internal(ThreadPool *tp, unsigned initialized, unsigned nThreads)
168 {
169 int ok;
170
171 assert(NULL != tp);
172 // Destroy all threads
173 if (0 < nThreads) {
174 assert(INITIALIZED_ALL == initialized);
175 ok = pthread_mutex_lock(&tp->mMutex);
176 assert(0 == ok);
177 tp->mShutdown = SL_BOOLEAN_TRUE;
178 ok = pthread_cond_broadcast(&tp->mCondNotEmpty);
179 assert(0 == ok);
180 ok = pthread_cond_broadcast(&tp->mCondNotFull);
181 assert(0 == ok);
182 ok = pthread_mutex_unlock(&tp->mMutex);
183 assert(0 == ok);
184 unsigned i;
185 for (i = 0; i < nThreads; ++i) {
186 ok = pthread_join(tp->mThreadArray[i], (void **) NULL);
187 assert(ok == 0);
188 }
189
190 // Empty out the circular buffer of closures
191 ok = pthread_mutex_lock(&tp->mMutex);
192 assert(0 == ok);
193 Closure **oldFront = tp->mClosureFront;
194 while (oldFront != tp->mClosureRear) {
195 Closure **newFront = oldFront;
196 if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1])
197 newFront = tp->mClosureArray;
198 Closure *pClosure = *oldFront;
199 assert(NULL != pClosure);
200 *oldFront = NULL;
201 tp->mClosureFront = newFront;
202 ok = pthread_mutex_unlock(&tp->mMutex);
203 assert(0 == ok);
204 free(pClosure);
205 ok = pthread_mutex_lock(&tp->mMutex);
206 assert(0 == ok);
207 }
208 ok = pthread_mutex_unlock(&tp->mMutex);
209 assert(0 == ok);
210 // Note that we can't be sure when mWaitingNotFull will drop to zero
211 }
212
213 // destroy the mutex and condition variables
214 if (initialized & INITIALIZED_CONDNOTEMPTY) {
215 ok = pthread_cond_destroy(&tp->mCondNotEmpty);
216 assert(0 == ok);
217 }
218 if (initialized & INITIALIZED_CONDNOTFULL) {
219 ok = pthread_cond_destroy(&tp->mCondNotFull);
220 assert(0 == ok);
221 }
222 if (initialized & INITIALIZED_MUTEX) {
223 ok = pthread_mutex_destroy(&tp->mMutex);
224 assert(0 == ok);
225 }
226 tp->mInitialized = INITIALIZED_NONE;
227
228 // release the closure circular buffer
229 if (tp->mClosureTypical != tp->mClosureArray && NULL != tp->mClosureArray) {
230 free(tp->mClosureArray);
231 tp->mClosureArray = NULL;
232 }
233
234 // release the thread pool
235 if (tp->mThreadTypical != tp->mThreadArray && NULL != tp->mThreadArray) {
236 free(tp->mThreadArray);
237 tp->mThreadArray = NULL;
238 }
239
240 }
241
ThreadPool_deinit(ThreadPool * tp)242 void ThreadPool_deinit(ThreadPool *tp)
243 {
244 ThreadPool_deinit_internal(tp, tp->mInitialized, tp->mMaxThreads);
245 }
246
247 // Enqueue a closure to be executed later by a worker thread.
248 // Note that this raw interface requires an explicit "kind" and full parameter list.
249 // There are convenience methods below that make this easier to use.
ThreadPool_add(ThreadPool * tp,ClosureKind kind,ClosureHandler_generic handler,void * context1,void * context2,void * context3,int parameter1,int parameter2)250 SLresult ThreadPool_add(ThreadPool *tp, ClosureKind kind, ClosureHandler_generic handler,
251 void *context1, void *context2, void *context3, int parameter1, int parameter2)
252 {
253 assert(NULL != tp);
254 assert(NULL != handler);
255 Closure *closure = (Closure *) malloc(sizeof(Closure));
256 if (NULL == closure) {
257 return SL_RESULT_RESOURCE_ERROR;
258 }
259 closure->mKind = kind;
260 switch (kind) {
261 case CLOSURE_KIND_PPI:
262 closure->mHandler.mHandler_ppi = (ClosureHandler_ppi)handler;
263 break;
264 case CLOSURE_KIND_PPII:
265 closure->mHandler.mHandler_ppii = (ClosureHandler_ppii)handler;
266 break;
267 case CLOSURE_KIND_PIIPP:
268 closure->mHandler.mHandler_piipp = (ClosureHandler_piipp)handler;
269 break;
270 default:
271 SL_LOGE("ThreadPool_add() invalid closure kind %d", kind);
272 assert(false);
273 }
274 closure->mContext1 = context1;
275 closure->mContext2 = context2;
276 closure->mContext3 = context3;
277 closure->mParameter1 = parameter1;
278 closure->mParameter2 = parameter2;
279 int ok;
280 ok = pthread_mutex_lock(&tp->mMutex);
281 assert(0 == ok);
282 // can't enqueue while thread pool shutting down
283 if (tp->mShutdown) {
284 ok = pthread_mutex_unlock(&tp->mMutex);
285 assert(0 == ok);
286 free(closure);
287 return SL_RESULT_PRECONDITIONS_VIOLATED;
288 }
289 for (;;) {
290 Closure **oldRear = tp->mClosureRear;
291 Closure **newRear = oldRear;
292 if (++newRear == &tp->mClosureArray[tp->mMaxClosures + 1])
293 newRear = tp->mClosureArray;
294 // if closure circular buffer is full, then wait for it to become non-full
295 if (newRear == tp->mClosureFront) {
296 ++tp->mWaitingNotFull;
297 ok = pthread_cond_wait(&tp->mCondNotFull, &tp->mMutex);
298 assert(0 == ok);
299 // can't enqueue while thread pool shutting down
300 if (tp->mShutdown) {
301 assert(0 < tp->mWaitingNotFull);
302 --tp->mWaitingNotFull;
303 ok = pthread_mutex_unlock(&tp->mMutex);
304 assert(0 == ok);
305 free(closure);
306 return SL_RESULT_PRECONDITIONS_VIOLATED;
307 }
308 continue;
309 }
310 assert(NULL == *oldRear);
311 *oldRear = closure;
312 tp->mClosureRear = newRear;
313 // if a worker thread was waiting to dequeue, then suggest that it try again
314 if (0 < tp->mWaitingNotEmpty) {
315 --tp->mWaitingNotEmpty;
316 ok = pthread_cond_signal(&tp->mCondNotEmpty);
317 assert(0 == ok);
318 }
319 break;
320 }
321 ok = pthread_mutex_unlock(&tp->mMutex);
322 assert(0 == ok);
323 return SL_RESULT_SUCCESS;
324 }
325
326 // Called by a worker thread when it is ready to accept the next closure to execute
ThreadPool_remove(ThreadPool * tp)327 Closure *ThreadPool_remove(ThreadPool *tp)
328 {
329 Closure *pClosure;
330 int ok;
331 ok = pthread_mutex_lock(&tp->mMutex);
332 assert(0 == ok);
333 for (;;) {
334 // fail if thread pool is shutting down
335 if (tp->mShutdown) {
336 pClosure = NULL;
337 break;
338 }
339 Closure **oldFront = tp->mClosureFront;
340 // if closure circular buffer is empty, then wait for it to become non-empty
341 if (oldFront == tp->mClosureRear) {
342 ++tp->mWaitingNotEmpty;
343 ok = pthread_cond_wait(&tp->mCondNotEmpty, &tp->mMutex);
344 assert(0 == ok);
345 // try again
346 continue;
347 }
348 // dequeue the closure at front of circular buffer
349 Closure **newFront = oldFront;
350 if (++newFront == &tp->mClosureArray[tp->mMaxClosures + 1]) {
351 newFront = tp->mClosureArray;
352 }
353 pClosure = *oldFront;
354 assert(NULL != pClosure);
355 *oldFront = NULL;
356 tp->mClosureFront = newFront;
357 // if a client thread was waiting to enqueue, then suggest that it try again
358 if (0 < tp->mWaitingNotFull) {
359 --tp->mWaitingNotFull;
360 ok = pthread_cond_signal(&tp->mCondNotFull);
361 assert(0 == ok);
362 }
363 break;
364 }
365 ok = pthread_mutex_unlock(&tp->mMutex);
366 assert(0 == ok);
367 return pClosure;
368 }
369
370 // Convenience methods for applications
ThreadPool_add_ppi(ThreadPool * tp,ClosureHandler_ppi handler,void * context1,void * context2,int parameter1)371 SLresult ThreadPool_add_ppi(ThreadPool *tp, ClosureHandler_ppi handler,
372 void *context1, void *context2, int parameter1)
373 {
374 // function pointers are the same size so this is a safe cast
375 return ThreadPool_add(tp, CLOSURE_KIND_PPI, (ClosureHandler_generic) handler,
376 context1, context2, NULL, parameter1, 0);
377 }
378
ThreadPool_add_ppii(ThreadPool * tp,ClosureHandler_ppii handler,void * context1,void * context2,int parameter1,int parameter2)379 SLresult ThreadPool_add_ppii(ThreadPool *tp, ClosureHandler_ppii handler,
380 void *context1, void *context2, int parameter1, int parameter2)
381 {
382 // function pointers are the same size so this is a safe cast
383 return ThreadPool_add(tp, CLOSURE_KIND_PPII, (ClosureHandler_generic) handler,
384 context1, context2, NULL, parameter1, parameter2);
385 }
386
ThreadPool_add_piipp(ThreadPool * tp,ClosureHandler_piipp handler,void * cntxt1,int param1,int param2,void * cntxt2,void * cntxt3)387 SLresult ThreadPool_add_piipp(ThreadPool *tp, ClosureHandler_piipp handler,
388 void *cntxt1, int param1, int param2, void *cntxt2, void *cntxt3)
389 {
390 // function pointers are the same size so this is a safe cast
391 return ThreadPool_add(tp, CLOSURE_KIND_PIIPP, (ClosureHandler_generic) handler,
392 cntxt1, cntxt2, cntxt3, param1, param2);
393 }
394