1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "WorkerPool.h"
18 //#include <atomic>
19 #include <stdlib.h>
20 #include <unistd.h>
21 #include <errno.h>
22 #include <android/log.h>
23 
24 
25 //static pthread_key_t gThreadTLSKey = 0;
26 //static uint32_t gThreadTLSKeyCount = 0;
27 //static pthread_mutex_t gInitMutex = PTHREAD_MUTEX_INITIALIZER;
28 
29 
Signal()30 WorkerPool::Signal::Signal() {
31     mSet = true;
32 }
33 
~Signal()34 WorkerPool::Signal::~Signal() {
35     pthread_mutex_destroy(&mMutex);
36     pthread_cond_destroy(&mCondition);
37 }
38 
init()39 bool WorkerPool::Signal::init() {
40     int status = pthread_mutex_init(&mMutex, NULL);
41     if (status) {
42         __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool mutex init failure");
43         return false;
44     }
45 
46     status = pthread_cond_init(&mCondition, NULL);
47     if (status) {
48         __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool condition init failure");
49         pthread_mutex_destroy(&mMutex);
50         return false;
51     }
52 
53     return true;
54 }
55 
set()56 void WorkerPool::Signal::set() {
57     int status;
58 
59     status = pthread_mutex_lock(&mMutex);
60     if (status) {
61         __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i locking for set condition.", status);
62         return;
63     }
64 
65     mSet = true;
66 
67     status = pthread_cond_signal(&mCondition);
68     if (status) {
69         __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i on set condition.", status);
70     }
71 
72     status = pthread_mutex_unlock(&mMutex);
73     if (status) {
74         __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i unlocking for set condition.", status);
75     }
76 }
77 
wait(uint64_t timeout)78 bool WorkerPool::Signal::wait(uint64_t timeout) {
79     int status;
80     bool ret = false;
81 
82     status = pthread_mutex_lock(&mMutex);
83     if (status) {
84         __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i locking for condition.", status);
85         return false;
86     }
87 
88     if (!mSet) {
89         if (!timeout) {
90             status = pthread_cond_wait(&mCondition, &mMutex);
91         } else {
92 #if defined(HAVE_PTHREAD_COND_TIMEDWAIT_RELATIVE)
93             status = pthread_cond_timeout_np(&mCondition, &mMutex, timeout / 1000000);
94 #else
95             // This is safe it will just make things less reponsive
96             status = pthread_cond_wait(&mCondition, &mMutex);
97 #endif
98         }
99     }
100 
101     if (!status) {
102         mSet = false;
103         ret = true;
104     } else {
105 #ifndef RS_SERVER
106         if (status != ETIMEDOUT) {
107             __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i waiting for condition.", status);
108         }
109 #endif
110     }
111 
112     status = pthread_mutex_unlock(&mMutex);
113     if (status) {
114         __android_log_print(ANDROID_LOG_INFO, "bench", "WorkerPool: error %i unlocking for condition.", status);
115     }
116 
117     return ret;
118 }
119 
120 
121 
WorkerPool()122 WorkerPool::WorkerPool() {
123     mExit = false;
124     mRunningCount = 0;
125     mLaunchCount = 0;
126     mCount = 0;
127     mThreadId = NULL;
128     mNativeThreadId = NULL;
129     mLaunchSignals = NULL;
130     mLaunchCallback = NULL;
131 
132 
133 }
134 
135 
~WorkerPool()136 WorkerPool::~WorkerPool() {
137 __android_log_print(ANDROID_LOG_INFO, "bench", "~wp");
138     mExit = true;
139     mLaunchData = NULL;
140     mLaunchCallback = NULL;
141     mRunningCount = mCount;
142 
143     __sync_synchronize();
144     for (uint32_t ct = 0; ct < mCount; ct++) {
145         mLaunchSignals[ct].set();
146     }
147     void *res;
148     for (uint32_t ct = 0; ct < mCount; ct++) {
149         pthread_join(mThreadId[ct], &res);
150     }
151     //rsAssert(__sync_fetch_and_or(&mRunningCount, 0) == 0);
152     free(mThreadId);
153     free(mNativeThreadId);
154     delete[] mLaunchSignals;
155 }
156 
init(int threadCount)157 bool WorkerPool::init(int threadCount) {
158     int cpu = sysconf(_SC_NPROCESSORS_CONF);
159     if (threadCount > 0) {
160         cpu = threadCount;
161     }
162     if (cpu < 1) {
163         return false;
164     }
165     mCount = (uint32_t)cpu;
166 
167     __android_log_print(ANDROID_LOG_INFO, "Bench", "ThreadLaunch %i", mCount);
168 
169     mThreadId = (pthread_t *) calloc(mCount, sizeof(pthread_t));
170     mNativeThreadId = (pid_t *) calloc(mCount, sizeof(pid_t));
171     mLaunchSignals = new Signal[mCount];
172     mLaunchCallback = NULL;
173 
174     mCompleteSignal.init();
175     mRunningCount = mCount;
176     mLaunchCount = 0;
177     __sync_synchronize();
178 
179     pthread_attr_t threadAttr;
180     int status = pthread_attr_init(&threadAttr);
181     if (status) {
182         __android_log_print(ANDROID_LOG_INFO, "bench", "Failed to init thread attribute.");
183         return false;
184     }
185 
186     for (uint32_t ct=0; ct < mCount; ct++) {
187         status = pthread_create(&mThreadId[ct], &threadAttr, helperThreadProc, this);
188         if (status) {
189             mCount = ct;
190             __android_log_print(ANDROID_LOG_INFO, "bench", "Created fewer than expected number of threads.");
191             return false;
192         }
193     }
194     while (__sync_fetch_and_or(&mRunningCount, 0) != 0) {
195         usleep(100);
196     }
197 
198     pthread_attr_destroy(&threadAttr);
199     return true;
200 }
201 
helperThreadProc(void * vwp)202 void * WorkerPool::helperThreadProc(void *vwp) {
203     WorkerPool *wp = (WorkerPool *)vwp;
204 
205     uint32_t idx = __sync_fetch_and_add(&wp->mLaunchCount, 1);
206 
207     wp->mLaunchSignals[idx].init();
208     wp->mNativeThreadId[idx] = gettid();
209 
210     while (!wp->mExit) {
211         wp->mLaunchSignals[idx].wait();
212         if (wp->mLaunchCallback) {
213            // idx +1 is used because the calling thread is always worker 0.
214            wp->mLaunchCallback(wp->mLaunchData, idx);
215         }
216         __sync_fetch_and_sub(&wp->mRunningCount, 1);
217         wp->mCompleteSignal.set();
218     }
219 
220     //ALOGV("RS helperThread exited %p idx=%i", dc, idx);
221     return NULL;
222 }
223 
224 
waitForAll() const225 void WorkerPool::waitForAll() const {
226 }
227 
waitFor(uint64_t) const228 void WorkerPool::waitFor(uint64_t) const {
229 }
230 
231 
232 
launchWork(WorkerCallback_t cb,void * usr,int maxThreads)233 uint64_t WorkerPool::launchWork(WorkerCallback_t cb, void *usr, int maxThreads) {
234     //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 1");
235     mLaunchData = usr;
236     mLaunchCallback = cb;
237 
238     if (maxThreads < 1) {
239         maxThreads = mCount;
240     }
241     if ((uint32_t)maxThreads > mCount) {
242         //__android_log_print(ANDROID_LOG_INFO, "bench", "launchWork max > count", maxThreads, mCount);
243         maxThreads = mCount;
244     }
245 
246     //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 2  %i  %i  %i", maxThreads, mRunningCount, mCount);
247     mRunningCount = maxThreads;
248     __sync_synchronize();
249 
250     for (int ct = 0; ct < maxThreads; ct++) {
251         mLaunchSignals[ct].set();
252     }
253 
254     //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 3    %i", mRunningCount);
255     while (__sync_fetch_and_or(&mRunningCount, 0) != 0) {
256         //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 3.1    %i", mRunningCount);
257         mCompleteSignal.wait();
258     }
259 
260     //__android_log_print(ANDROID_LOG_INFO, "bench", "lw 4    %i", mRunningCount);
261     return 0;
262 
263 }
264 
265 
266 
267