1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "webrtc/base/thread.h"
12 
13 #ifndef __has_feature
14 #define __has_feature(x) 0  // Compatibility with non-clang or LLVM compilers.
15 #endif  // __has_feature
16 
17 #if defined(WEBRTC_WIN)
18 #include <comdef.h>
19 #elif defined(WEBRTC_POSIX)
20 #include <time.h>
21 #endif
22 
23 #include "webrtc/base/common.h"
24 #include "webrtc/base/logging.h"
25 #include "webrtc/base/platform_thread.h"
26 #include "webrtc/base/stringutils.h"
27 #include "webrtc/base/timeutils.h"
28 
29 #if !__has_feature(objc_arc) && (defined(WEBRTC_MAC))
30 #include "webrtc/base/maccocoathreadhelper.h"
31 #include "webrtc/base/scoped_autorelease_pool.h"
32 #endif
33 
34 #include "webrtc/base/trace_event.h"
35 
36 namespace rtc {
37 
Instance()38 ThreadManager* ThreadManager::Instance() {
39   RTC_DEFINE_STATIC_LOCAL(ThreadManager, thread_manager, ());
40   return &thread_manager;
41 }
42 
43 // static
Current()44 Thread* Thread::Current() {
45   return ThreadManager::Instance()->CurrentThread();
46 }
47 
48 #if defined(WEBRTC_POSIX)
ThreadManager()49 ThreadManager::ThreadManager() {
50   pthread_key_create(&key_, NULL);
51 #ifndef NO_MAIN_THREAD_WRAPPING
52   WrapCurrentThread();
53 #endif
54 #if !__has_feature(objc_arc) && (defined(WEBRTC_MAC))
55   // Under Automatic Reference Counting (ARC), you cannot use autorelease pools
56   // directly. Instead, you use @autoreleasepool blocks instead.  Also, we are
57   // maintaining thread safety using immutability within context of GCD dispatch
58   // queues in this case.
59   InitCocoaMultiThreading();
60 #endif
61 }
62 
~ThreadManager()63 ThreadManager::~ThreadManager() {
64 #if __has_feature(objc_arc)
65   @autoreleasepool
66 #elif defined(WEBRTC_MAC)
67   // This is called during exit, at which point apparently no NSAutoreleasePools
68   // are available; but we might still need them to do cleanup (or we get the
69   // "no autoreleasepool in place, just leaking" warning when exiting).
70   ScopedAutoreleasePool pool;
71 #endif
72   {
73     UnwrapCurrentThread();
74     pthread_key_delete(key_);
75   }
76 }
77 
CurrentThread()78 Thread *ThreadManager::CurrentThread() {
79   return static_cast<Thread *>(pthread_getspecific(key_));
80 }
81 
SetCurrentThread(Thread * thread)82 void ThreadManager::SetCurrentThread(Thread *thread) {
83   pthread_setspecific(key_, thread);
84 }
85 #endif
86 
87 #if defined(WEBRTC_WIN)
ThreadManager()88 ThreadManager::ThreadManager() {
89   key_ = TlsAlloc();
90 #ifndef NO_MAIN_THREAD_WRAPPING
91   WrapCurrentThread();
92 #endif
93 }
94 
~ThreadManager()95 ThreadManager::~ThreadManager() {
96   UnwrapCurrentThread();
97   TlsFree(key_);
98 }
99 
CurrentThread()100 Thread *ThreadManager::CurrentThread() {
101   return static_cast<Thread *>(TlsGetValue(key_));
102 }
103 
SetCurrentThread(Thread * thread)104 void ThreadManager::SetCurrentThread(Thread *thread) {
105   TlsSetValue(key_, thread);
106 }
107 #endif
108 
WrapCurrentThread()109 Thread *ThreadManager::WrapCurrentThread() {
110   Thread* result = CurrentThread();
111   if (NULL == result) {
112     result = new Thread();
113     result->WrapCurrentWithThreadManager(this, true);
114   }
115   return result;
116 }
117 
UnwrapCurrentThread()118 void ThreadManager::UnwrapCurrentThread() {
119   Thread* t = CurrentThread();
120   if (t && !(t->IsOwned())) {
121     t->UnwrapCurrent();
122     delete t;
123   }
124 }
125 
126 struct ThreadInit {
127   Thread* thread;
128   Runnable* runnable;
129 };
130 
ScopedDisallowBlockingCalls()131 Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls()
132   : thread_(Thread::Current()),
133     previous_state_(thread_->SetAllowBlockingCalls(false)) {
134 }
135 
~ScopedDisallowBlockingCalls()136 Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() {
137   ASSERT(thread_->IsCurrent());
138   thread_->SetAllowBlockingCalls(previous_state_);
139 }
140 
Thread(SocketServer * ss)141 Thread::Thread(SocketServer* ss)
142     : MessageQueue(ss),
143       running_(true, false),
144 #if defined(WEBRTC_WIN)
145       thread_(NULL),
146       thread_id_(0),
147 #endif
148       owned_(true),
149       blocking_calls_allowed_(true) {
150   SetName("Thread", this);  // default name
151 }
152 
~Thread()153 Thread::~Thread() {
154   Stop();
155   Clear(NULL);
156 }
157 
SleepMs(int milliseconds)158 bool Thread::SleepMs(int milliseconds) {
159   AssertBlockingIsAllowedOnCurrentThread();
160 
161 #if defined(WEBRTC_WIN)
162   ::Sleep(milliseconds);
163   return true;
164 #else
165   // POSIX has both a usleep() and a nanosleep(), but the former is deprecated,
166   // so we use nanosleep() even though it has greater precision than necessary.
167   struct timespec ts;
168   ts.tv_sec = milliseconds / 1000;
169   ts.tv_nsec = (milliseconds % 1000) * 1000000;
170   int ret = nanosleep(&ts, NULL);
171   if (ret != 0) {
172     LOG_ERR(LS_WARNING) << "nanosleep() returning early";
173     return false;
174   }
175   return true;
176 #endif
177 }
178 
SetName(const std::string & name,const void * obj)179 bool Thread::SetName(const std::string& name, const void* obj) {
180   if (running()) return false;
181   name_ = name;
182   if (obj) {
183     char buf[16];
184     sprintfn(buf, sizeof(buf), " 0x%p", obj);
185     name_ += buf;
186   }
187   return true;
188 }
189 
Start(Runnable * runnable)190 bool Thread::Start(Runnable* runnable) {
191   ASSERT(owned_);
192   if (!owned_) return false;
193   ASSERT(!running());
194   if (running()) return false;
195 
196   Restart();  // reset fStop_ if the thread is being restarted
197 
198   // Make sure that ThreadManager is created on the main thread before
199   // we start a new thread.
200   ThreadManager::Instance();
201 
202   ThreadInit* init = new ThreadInit;
203   init->thread = this;
204   init->runnable = runnable;
205 #if defined(WEBRTC_WIN)
206   thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)PreRun, init, 0,
207                          &thread_id_);
208   if (thread_) {
209     running_.Set();
210   } else {
211     return false;
212   }
213 #elif defined(WEBRTC_POSIX)
214   pthread_attr_t attr;
215   pthread_attr_init(&attr);
216 
217   int error_code = pthread_create(&thread_, &attr, PreRun, init);
218   if (0 != error_code) {
219     LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
220     return false;
221   }
222   running_.Set();
223 #endif
224   return true;
225 }
226 
WrapCurrent()227 bool Thread::WrapCurrent() {
228   return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
229 }
230 
UnwrapCurrent()231 void Thread::UnwrapCurrent() {
232   // Clears the platform-specific thread-specific storage.
233   ThreadManager::Instance()->SetCurrentThread(NULL);
234 #if defined(WEBRTC_WIN)
235   if (thread_ != NULL) {
236     if (!CloseHandle(thread_)) {
237       LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle.";
238     }
239     thread_ = NULL;
240   }
241 #endif
242   running_.Reset();
243 }
244 
SafeWrapCurrent()245 void Thread::SafeWrapCurrent() {
246   WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
247 }
248 
Join()249 void Thread::Join() {
250   if (running()) {
251     ASSERT(!IsCurrent());
252     if (Current() && !Current()->blocking_calls_allowed_) {
253       LOG(LS_WARNING) << "Waiting for the thread to join, "
254                       << "but blocking calls have been disallowed";
255     }
256 
257 #if defined(WEBRTC_WIN)
258     ASSERT(thread_ != NULL);
259     WaitForSingleObject(thread_, INFINITE);
260     CloseHandle(thread_);
261     thread_ = NULL;
262     thread_id_ = 0;
263 #elif defined(WEBRTC_POSIX)
264     void *pv;
265     pthread_join(thread_, &pv);
266 #endif
267     running_.Reset();
268   }
269 }
270 
SetAllowBlockingCalls(bool allow)271 bool Thread::SetAllowBlockingCalls(bool allow) {
272   ASSERT(IsCurrent());
273   bool previous = blocking_calls_allowed_;
274   blocking_calls_allowed_ = allow;
275   return previous;
276 }
277 
278 // static
AssertBlockingIsAllowedOnCurrentThread()279 void Thread::AssertBlockingIsAllowedOnCurrentThread() {
280 #if !defined(NDEBUG)
281   Thread* current = Thread::Current();
282   ASSERT(!current || current->blocking_calls_allowed_);
283 #endif
284 }
285 
PreRun(void * pv)286 void* Thread::PreRun(void* pv) {
287   ThreadInit* init = static_cast<ThreadInit*>(pv);
288   ThreadManager::Instance()->SetCurrentThread(init->thread);
289   rtc::SetCurrentThreadName(init->thread->name_.c_str());
290 #if __has_feature(objc_arc)
291   @autoreleasepool
292 #elif defined(WEBRTC_MAC)
293   // Make sure the new thread has an autoreleasepool
294   ScopedAutoreleasePool pool;
295 #endif
296   {
297     if (init->runnable) {
298       init->runnable->Run(init->thread);
299     } else {
300       init->thread->Run();
301     }
302     delete init;
303     return NULL;
304   }
305 }
306 
Run()307 void Thread::Run() {
308   ProcessMessages(kForever);
309 }
310 
IsOwned()311 bool Thread::IsOwned() {
312   return owned_;
313 }
314 
Stop()315 void Thread::Stop() {
316   MessageQueue::Quit();
317   Join();
318 }
319 
Send(MessageHandler * phandler,uint32_t id,MessageData * pdata)320 void Thread::Send(MessageHandler* phandler, uint32_t id, MessageData* pdata) {
321   if (fStop_)
322     return;
323 
324   // Sent messages are sent to the MessageHandler directly, in the context
325   // of "thread", like Win32 SendMessage. If in the right context,
326   // call the handler directly.
327   Message msg;
328   msg.phandler = phandler;
329   msg.message_id = id;
330   msg.pdata = pdata;
331   if (IsCurrent()) {
332     phandler->OnMessage(&msg);
333     return;
334   }
335 
336   AssertBlockingIsAllowedOnCurrentThread();
337 
338   AutoThread thread;
339   Thread *current_thread = Thread::Current();
340   ASSERT(current_thread != NULL);  // AutoThread ensures this
341 
342   bool ready = false;
343   {
344     CritScope cs(&crit_);
345     _SendMessage smsg;
346     smsg.thread = current_thread;
347     smsg.msg = msg;
348     smsg.ready = &ready;
349     sendlist_.push_back(smsg);
350   }
351 
352   // Wait for a reply
353 
354   ss_->WakeUp();
355 
356   bool waited = false;
357   crit_.Enter();
358   while (!ready) {
359     crit_.Leave();
360     // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
361     // thread invoking calls on the current thread.
362     current_thread->ReceiveSendsFromThread(this);
363     current_thread->socketserver()->Wait(kForever, false);
364     waited = true;
365     crit_.Enter();
366   }
367   crit_.Leave();
368 
369   // Our Wait loop above may have consumed some WakeUp events for this
370   // MessageQueue, that weren't relevant to this Send.  Losing these WakeUps can
371   // cause problems for some SocketServers.
372   //
373   // Concrete example:
374   // Win32SocketServer on thread A calls Send on thread B.  While processing the
375   // message, thread B Posts a message to A.  We consume the wakeup for that
376   // Post while waiting for the Send to complete, which means that when we exit
377   // this loop, we need to issue another WakeUp, or else the Posted message
378   // won't be processed in a timely manner.
379 
380   if (waited) {
381     current_thread->socketserver()->WakeUp();
382   }
383 }
384 
ReceiveSends()385 void Thread::ReceiveSends() {
386   ReceiveSendsFromThread(NULL);
387 }
388 
ReceiveSendsFromThread(const Thread * source)389 void Thread::ReceiveSendsFromThread(const Thread* source) {
390   // Receive a sent message. Cleanup scenarios:
391   // - thread sending exits: We don't allow this, since thread can exit
392   //   only via Join, so Send must complete.
393   // - thread receiving exits: Wakeup/set ready in Thread::Clear()
394   // - object target cleared: Wakeup/set ready in Thread::Clear()
395   _SendMessage smsg;
396 
397   crit_.Enter();
398   while (PopSendMessageFromThread(source, &smsg)) {
399     crit_.Leave();
400 
401     smsg.msg.phandler->OnMessage(&smsg.msg);
402 
403     crit_.Enter();
404     *smsg.ready = true;
405     smsg.thread->socketserver()->WakeUp();
406   }
407   crit_.Leave();
408 }
409 
PopSendMessageFromThread(const Thread * source,_SendMessage * msg)410 bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
411   for (std::list<_SendMessage>::iterator it = sendlist_.begin();
412        it != sendlist_.end(); ++it) {
413     if (it->thread == source || source == NULL) {
414       *msg = *it;
415       sendlist_.erase(it);
416       return true;
417     }
418   }
419   return false;
420 }
421 
InvokeBegin()422 void Thread::InvokeBegin() {
423   TRACE_EVENT_BEGIN0("webrtc", "Thread::Invoke");
424 }
425 
InvokeEnd()426 void Thread::InvokeEnd() {
427   TRACE_EVENT_END0("webrtc", "Thread::Invoke");
428 }
429 
Clear(MessageHandler * phandler,uint32_t id,MessageList * removed)430 void Thread::Clear(MessageHandler* phandler,
431                    uint32_t id,
432                    MessageList* removed) {
433   CritScope cs(&crit_);
434 
435   // Remove messages on sendlist_ with phandler
436   // Object target cleared: remove from send list, wakeup/set ready
437   // if sender not NULL.
438 
439   std::list<_SendMessage>::iterator iter = sendlist_.begin();
440   while (iter != sendlist_.end()) {
441     _SendMessage smsg = *iter;
442     if (smsg.msg.Match(phandler, id)) {
443       if (removed) {
444         removed->push_back(smsg.msg);
445       } else {
446         delete smsg.msg.pdata;
447       }
448       iter = sendlist_.erase(iter);
449       *smsg.ready = true;
450       smsg.thread->socketserver()->WakeUp();
451       continue;
452     }
453     ++iter;
454   }
455 
456   MessageQueue::Clear(phandler, id, removed);
457 }
458 
ProcessMessages(int cmsLoop)459 bool Thread::ProcessMessages(int cmsLoop) {
460   uint32_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
461   int cmsNext = cmsLoop;
462 
463   while (true) {
464 #if __has_feature(objc_arc)
465     @autoreleasepool
466 #elif defined(WEBRTC_MAC)
467     // see: http://developer.apple.com/library/mac/#documentation/Cocoa/Reference/Foundation/Classes/NSAutoreleasePool_Class/Reference/Reference.html
468     // Each thread is supposed to have an autorelease pool. Also for event loops
469     // like this, autorelease pool needs to be created and drained/released
470     // for each cycle.
471     ScopedAutoreleasePool pool;
472 #endif
473     {
474       Message msg;
475       if (!Get(&msg, cmsNext))
476         return !IsQuitting();
477       Dispatch(&msg);
478 
479       if (cmsLoop != kForever) {
480         cmsNext = TimeUntil(msEnd);
481         if (cmsNext < 0)
482           return true;
483       }
484     }
485   }
486 }
487 
WrapCurrentWithThreadManager(ThreadManager * thread_manager,bool need_synchronize_access)488 bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
489                                           bool need_synchronize_access) {
490   if (running())
491     return false;
492 
493 #if defined(WEBRTC_WIN)
494   if (need_synchronize_access) {
495     // We explicitly ask for no rights other than synchronization.
496     // This gives us the best chance of succeeding.
497     thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
498     if (!thread_) {
499       LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
500       return false;
501     }
502     thread_id_ = GetCurrentThreadId();
503   }
504 #elif defined(WEBRTC_POSIX)
505   thread_ = pthread_self();
506 #endif
507 
508   owned_ = false;
509   running_.Set();
510   thread_manager->SetCurrentThread(this);
511   return true;
512 }
513 
AutoThread(SocketServer * ss)514 AutoThread::AutoThread(SocketServer* ss) : Thread(ss) {
515   if (!ThreadManager::Instance()->CurrentThread()) {
516     ThreadManager::Instance()->SetCurrentThread(this);
517   }
518 }
519 
~AutoThread()520 AutoThread::~AutoThread() {
521   Stop();
522   if (ThreadManager::Instance()->CurrentThread() == this) {
523     ThreadManager::Instance()->SetCurrentThread(NULL);
524   }
525 }
526 
527 #if defined(WEBRTC_WIN)
Run()528 void ComThread::Run() {
529   HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);
530   ASSERT(SUCCEEDED(hr));
531   if (SUCCEEDED(hr)) {
532     Thread::Run();
533     CoUninitialize();
534   } else {
535     LOG(LS_ERROR) << "CoInitialize failed, hr=" << hr;
536   }
537 }
538 #endif
539 
540 }  // namespace rtc
541