1 // Copyright 2011 Google Inc. All Rights Reserved. 2 // 3 // Use of this source code is governed by a BSD-style license 4 // that can be found in the COPYING file in the root of the source 5 // tree. An additional intellectual property rights grant can be found 6 // in the file PATENTS. All contributing project authors may 7 // be found in the AUTHORS file in the root of the source tree. 8 // ----------------------------------------------------------------------------- 9 // 10 // Multi-threaded worker 11 // 12 // Author: Skal (pascal.massimino@gmail.com) 13 14 #include <assert.h> 15 #include <string.h> // for memset() 16 #include "src/utils/thread_utils.h" 17 #include "src/utils/utils.h" 18 19 #ifdef WEBP_USE_THREAD 20 21 #if defined(_WIN32) 22 23 #include <windows.h> 24 typedef HANDLE pthread_t; 25 typedef CRITICAL_SECTION pthread_mutex_t; 26 27 #if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater 28 #define USE_WINDOWS_CONDITION_VARIABLE 29 typedef CONDITION_VARIABLE pthread_cond_t; 30 #else 31 typedef struct { 32 HANDLE waiting_sem_; 33 HANDLE received_sem_; 34 HANDLE signal_event_; 35 } pthread_cond_t; 36 #endif // _WIN32_WINNT >= 0x600 37 38 #ifndef WINAPI_FAMILY_PARTITION 39 #define WINAPI_PARTITION_DESKTOP 1 40 #define WINAPI_FAMILY_PARTITION(x) x 41 #endif 42 43 #if !WINAPI_FAMILY_PARTITION(WINAPI_PARTITION_DESKTOP) 44 #define USE_CREATE_THREAD 45 #endif 46 47 #else // !_WIN32 48 49 #include <pthread.h> 50 51 #endif // _WIN32 52 53 typedef struct { 54 pthread_mutex_t mutex_; 55 pthread_cond_t condition_; 56 pthread_t thread_; 57 } WebPWorkerImpl; 58 59 #if defined(_WIN32) 60 61 //------------------------------------------------------------------------------ 62 // simplistic pthread emulation layer 63 64 #include <process.h> 65 66 // _beginthreadex requires __stdcall 67 #define THREADFN unsigned int __stdcall 68 #define THREAD_RETURN(val) (unsigned int)((DWORD_PTR)val) 69 70 #if _WIN32_WINNT >= 0x0501 // Windows XP or greater 71 #define WaitForSingleObject(obj, timeout) \ 72 WaitForSingleObjectEx(obj, timeout, FALSE /*bAlertable*/) 73 #endif 74 75 static int pthread_create(pthread_t* const thread, const void* attr, 76 unsigned int (__stdcall* start)(void*), void* arg) { 77 (void)attr; 78 #ifdef USE_CREATE_THREAD 79 *thread = CreateThread(NULL, /* lpThreadAttributes */ 80 0, /* dwStackSize */ 81 start, 82 arg, 83 0, /* dwStackSize */ 84 NULL); /* lpThreadId */ 85 #else 86 *thread = (pthread_t)_beginthreadex(NULL, /* void *security */ 87 0, /* unsigned stack_size */ 88 start, 89 arg, 90 0, /* unsigned initflag */ 91 NULL); /* unsigned *thrdaddr */ 92 #endif 93 if (*thread == NULL) return 1; 94 SetThreadPriority(*thread, THREAD_PRIORITY_ABOVE_NORMAL); 95 return 0; 96 } 97 98 static int pthread_join(pthread_t thread, void** value_ptr) { 99 (void)value_ptr; 100 return (WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0 || 101 CloseHandle(thread) == 0); 102 } 103 104 // Mutex 105 static int pthread_mutex_init(pthread_mutex_t* const mutex, void* mutexattr) { 106 (void)mutexattr; 107 #if _WIN32_WINNT >= 0x0600 // Windows Vista / Server 2008 or greater 108 InitializeCriticalSectionEx(mutex, 0 /*dwSpinCount*/, 0 /*Flags*/); 109 #else 110 InitializeCriticalSection(mutex); 111 #endif 112 return 0; 113 } 114 115 static int pthread_mutex_lock(pthread_mutex_t* const mutex) { 116 EnterCriticalSection(mutex); 117 return 0; 118 } 119 120 static int pthread_mutex_unlock(pthread_mutex_t* const mutex) { 121 LeaveCriticalSection(mutex); 122 return 0; 123 } 124 125 static int pthread_mutex_destroy(pthread_mutex_t* const mutex) { 126 DeleteCriticalSection(mutex); 127 return 0; 128 } 129 130 // Condition 131 static int pthread_cond_destroy(pthread_cond_t* const condition) { 132 int ok = 1; 133 #ifdef USE_WINDOWS_CONDITION_VARIABLE 134 (void)condition; 135 #else 136 ok &= (CloseHandle(condition->waiting_sem_) != 0); 137 ok &= (CloseHandle(condition->received_sem_) != 0); 138 ok &= (CloseHandle(condition->signal_event_) != 0); 139 #endif 140 return !ok; 141 } 142 143 static int pthread_cond_init(pthread_cond_t* const condition, void* cond_attr) { 144 (void)cond_attr; 145 #ifdef USE_WINDOWS_CONDITION_VARIABLE 146 InitializeConditionVariable(condition); 147 #else 148 condition->waiting_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 149 condition->received_sem_ = CreateSemaphore(NULL, 0, 1, NULL); 150 condition->signal_event_ = CreateEvent(NULL, FALSE, FALSE, NULL); 151 if (condition->waiting_sem_ == NULL || 152 condition->received_sem_ == NULL || 153 condition->signal_event_ == NULL) { 154 pthread_cond_destroy(condition); 155 return 1; 156 } 157 #endif 158 return 0; 159 } 160 161 static int pthread_cond_signal(pthread_cond_t* const condition) { 162 int ok = 1; 163 #ifdef USE_WINDOWS_CONDITION_VARIABLE 164 WakeConditionVariable(condition); 165 #else 166 if (WaitForSingleObject(condition->waiting_sem_, 0) == WAIT_OBJECT_0) { 167 // a thread is waiting in pthread_cond_wait: allow it to be notified 168 ok = SetEvent(condition->signal_event_); 169 // wait until the event is consumed so the signaler cannot consume 170 // the event via its own pthread_cond_wait. 171 ok &= (WaitForSingleObject(condition->received_sem_, INFINITE) != 172 WAIT_OBJECT_0); 173 } 174 #endif 175 return !ok; 176 } 177 178 static int pthread_cond_wait(pthread_cond_t* const condition, 179 pthread_mutex_t* const mutex) { 180 int ok; 181 #ifdef USE_WINDOWS_CONDITION_VARIABLE 182 ok = SleepConditionVariableCS(condition, mutex, INFINITE); 183 #else 184 // note that there is a consumer available so the signal isn't dropped in 185 // pthread_cond_signal 186 if (!ReleaseSemaphore(condition->waiting_sem_, 1, NULL)) return 1; 187 // now unlock the mutex so pthread_cond_signal may be issued 188 pthread_mutex_unlock(mutex); 189 ok = (WaitForSingleObject(condition->signal_event_, INFINITE) == 190 WAIT_OBJECT_0); 191 ok &= ReleaseSemaphore(condition->received_sem_, 1, NULL); 192 pthread_mutex_lock(mutex); 193 #endif 194 return !ok; 195 } 196 197 #else // !_WIN32 198 # define THREADFN void* 199 # define THREAD_RETURN(val) val 200 #endif // _WIN32 201 202 //------------------------------------------------------------------------------ 203 204 static THREADFN ThreadLoop(void* ptr) { 205 WebPWorker* const worker = (WebPWorker*)ptr; 206 WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_; 207 int done = 0; 208 while (!done) { 209 pthread_mutex_lock(&impl->mutex_); 210 while (worker->status_ == OK) { // wait in idling mode 211 pthread_cond_wait(&impl->condition_, &impl->mutex_); 212 } 213 if (worker->status_ == WORK) { 214 WebPGetWorkerInterface()->Execute(worker); 215 worker->status_ = OK; 216 } else if (worker->status_ == NOT_OK) { // finish the worker 217 done = 1; 218 } 219 // signal to the main thread that we're done (for Sync()) 220 // Note the associated mutex does not need to be held when signaling the 221 // condition. Unlocking the mutex first may improve performance in some 222 // implementations, avoiding the case where the waiting thread can't 223 // reacquire the mutex when woken. 224 pthread_mutex_unlock(&impl->mutex_); 225 pthread_cond_signal(&impl->condition_); 226 } 227 return THREAD_RETURN(NULL); // Thread is finished 228 } 229 230 // main thread state control 231 static void ChangeState(WebPWorker* const worker, WebPWorkerStatus new_status) { 232 // No-op when attempting to change state on a thread that didn't come up. 233 // Checking status_ without acquiring the lock first would result in a data 234 // race. 235 WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_; 236 if (impl == NULL) return; 237 238 pthread_mutex_lock(&impl->mutex_); 239 if (worker->status_ >= OK) { 240 // wait for the worker to finish 241 while (worker->status_ != OK) { 242 pthread_cond_wait(&impl->condition_, &impl->mutex_); 243 } 244 // assign new status and release the working thread if needed 245 if (new_status != OK) { 246 worker->status_ = new_status; 247 // Note the associated mutex does not need to be held when signaling the 248 // condition. Unlocking the mutex first may improve performance in some 249 // implementations, avoiding the case where the waiting thread can't 250 // reacquire the mutex when woken. 251 pthread_mutex_unlock(&impl->mutex_); 252 pthread_cond_signal(&impl->condition_); 253 return; 254 } 255 } 256 pthread_mutex_unlock(&impl->mutex_); 257 } 258 259 #endif // WEBP_USE_THREAD 260 261 //------------------------------------------------------------------------------ 262 263 static void Init(WebPWorker* const worker) { 264 memset(worker, 0, sizeof(*worker)); 265 worker->status_ = NOT_OK; 266 } 267 268 static int Sync(WebPWorker* const worker) { 269 #ifdef WEBP_USE_THREAD 270 ChangeState(worker, OK); 271 #endif 272 assert(worker->status_ <= OK); 273 return !worker->had_error; 274 } 275 276 static int Reset(WebPWorker* const worker) { 277 int ok = 1; 278 worker->had_error = 0; 279 if (worker->status_ < OK) { 280 #ifdef WEBP_USE_THREAD 281 WebPWorkerImpl* const impl = 282 (WebPWorkerImpl*)WebPSafeCalloc(1, sizeof(WebPWorkerImpl)); 283 worker->impl_ = (void*)impl; 284 if (worker->impl_ == NULL) { 285 return 0; 286 } 287 if (pthread_mutex_init(&impl->mutex_, NULL)) { 288 goto Error; 289 } 290 if (pthread_cond_init(&impl->condition_, NULL)) { 291 pthread_mutex_destroy(&impl->mutex_); 292 goto Error; 293 } 294 pthread_mutex_lock(&impl->mutex_); 295 ok = !pthread_create(&impl->thread_, NULL, ThreadLoop, worker); 296 if (ok) worker->status_ = OK; 297 pthread_mutex_unlock(&impl->mutex_); 298 if (!ok) { 299 pthread_mutex_destroy(&impl->mutex_); 300 pthread_cond_destroy(&impl->condition_); 301 Error: 302 WebPSafeFree(impl); 303 worker->impl_ = NULL; 304 return 0; 305 } 306 #else 307 worker->status_ = OK; 308 #endif 309 } else if (worker->status_ > OK) { 310 ok = Sync(worker); 311 } 312 assert(!ok || (worker->status_ == OK)); 313 return ok; 314 } 315 316 static void Execute(WebPWorker* const worker) { 317 if (worker->hook != NULL) { 318 worker->had_error |= !worker->hook(worker->data1, worker->data2); 319 } 320 } 321 322 static void Launch(WebPWorker* const worker) { 323 #ifdef WEBP_USE_THREAD 324 ChangeState(worker, WORK); 325 #else 326 Execute(worker); 327 #endif 328 } 329 330 static void End(WebPWorker* const worker) { 331 #ifdef WEBP_USE_THREAD 332 if (worker->impl_ != NULL) { 333 WebPWorkerImpl* const impl = (WebPWorkerImpl*)worker->impl_; 334 ChangeState(worker, NOT_OK); 335 pthread_join(impl->thread_, NULL); 336 pthread_mutex_destroy(&impl->mutex_); 337 pthread_cond_destroy(&impl->condition_); 338 WebPSafeFree(impl); 339 worker->impl_ = NULL; 340 } 341 #else 342 worker->status_ = NOT_OK; 343 assert(worker->impl_ == NULL); 344 #endif 345 assert(worker->status_ == NOT_OK); 346 } 347 348 //------------------------------------------------------------------------------ 349 350 static WebPWorkerInterface g_worker_interface = { 351 Init, Reset, Sync, Launch, Execute, End 352 }; 353 354 int WebPSetWorkerInterface(const WebPWorkerInterface* const winterface) { 355 if (winterface == NULL || 356 winterface->Init == NULL || winterface->Reset == NULL || 357 winterface->Sync == NULL || winterface->Launch == NULL || 358 winterface->Execute == NULL || winterface->End == NULL) { 359 return 0; 360 } 361 g_worker_interface = *winterface; 362 return 1; 363 } 364 365 const WebPWorkerInterface* WebPGetWorkerInterface(void) { 366 return &g_worker_interface; 367 } 368 369 //------------------------------------------------------------------------------ 370