1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #ifndef WEBRTC_BASE_THREAD_H_
12 #define WEBRTC_BASE_THREAD_H_
13 
14 #include <algorithm>
15 #include <list>
16 #include <string>
17 #include <vector>
18 
19 #if defined(WEBRTC_POSIX)
20 #include <pthread.h>
21 #endif
22 #include "webrtc/base/constructormagic.h"
23 #include "webrtc/base/event.h"
24 #include "webrtc/base/messagequeue.h"
25 
26 #if defined(WEBRTC_WIN)
27 #include "webrtc/base/win32.h"
28 #endif
29 
30 namespace rtc {
31 
32 class Thread;
33 
34 class ThreadManager {
35  public:
36   static const int kForever = -1;
37 
38   ThreadManager();
39   ~ThreadManager();
40 
41   static ThreadManager* Instance();
42 
43   Thread* CurrentThread();
44   void SetCurrentThread(Thread* thread);
45 
46   // Returns a thread object with its thread_ ivar set
47   // to whatever the OS uses to represent the thread.
48   // If there already *is* a Thread object corresponding to this thread,
49   // this method will return that.  Otherwise it creates a new Thread
50   // object whose wrapped() method will return true, and whose
51   // handle will, on Win32, be opened with only synchronization privileges -
52   // if you need more privilegs, rather than changing this method, please
53   // write additional code to adjust the privileges, or call a different
54   // factory method of your own devising, because this one gets used in
55   // unexpected contexts (like inside browser plugins) and it would be a
56   // shame to break it.  It is also conceivable on Win32 that we won't even
57   // be able to get synchronization privileges, in which case the result
58   // will have a NULL handle.
59   Thread *WrapCurrentThread();
60   void UnwrapCurrentThread();
61 
62  private:
63 #if defined(WEBRTC_POSIX)
64   pthread_key_t key_;
65 #endif
66 
67 #if defined(WEBRTC_WIN)
68   DWORD key_;
69 #endif
70 
71   RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager);
72 };
73 
74 struct _SendMessage {
_SendMessage_SendMessage75   _SendMessage() {}
76   Thread *thread;
77   Message msg;
78   bool *ready;
79 };
80 
81 class Runnable {
82  public:
~Runnable()83   virtual ~Runnable() {}
84   virtual void Run(Thread* thread) = 0;
85 
86  protected:
Runnable()87   Runnable() {}
88 
89  private:
90   RTC_DISALLOW_COPY_AND_ASSIGN(Runnable);
91 };
92 
93 // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS!  See ~Thread().
94 
95 class Thread : public MessageQueue {
96  public:
97   explicit Thread(SocketServer* ss = NULL);
98   // NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or
99   // guarantee Stop() is explicitly called before the subclass is destroyed).
100   // This is required to avoid a data race between the destructor modifying the
101   // vtable, and the Thread::PreRun calling the virtual method Run().
102   ~Thread() override;
103 
104   static Thread* Current();
105 
106   // Used to catch performance regressions. Use this to disallow blocking calls
107   // (Invoke) for a given scope.  If a synchronous call is made while this is in
108   // effect, an assert will be triggered.
109   // Note that this is a single threaded class.
110   class ScopedDisallowBlockingCalls {
111    public:
112     ScopedDisallowBlockingCalls();
113     ~ScopedDisallowBlockingCalls();
114    private:
115     Thread* const thread_;
116     const bool previous_state_;
117   };
118 
IsCurrent()119   bool IsCurrent() const {
120     return Current() == this;
121   }
122 
123   // Sleeps the calling thread for the specified number of milliseconds, during
124   // which time no processing is performed. Returns false if sleeping was
125   // interrupted by a signal (POSIX only).
126   static bool SleepMs(int millis);
127 
128   // Sets the thread's name, for debugging. Must be called before Start().
129   // If |obj| is non-NULL, its value is appended to |name|.
name()130   const std::string& name() const { return name_; }
131   bool SetName(const std::string& name, const void* obj);
132 
133   // Starts the execution of the thread.
134   bool Start(Runnable* runnable = NULL);
135 
136   // Tells the thread to stop and waits until it is joined.
137   // Never call Stop on the current thread.  Instead use the inherited Quit
138   // function which will exit the base MessageQueue without terminating the
139   // underlying OS thread.
140   virtual void Stop();
141 
142   // By default, Thread::Run() calls ProcessMessages(kForever).  To do other
143   // work, override Run().  To receive and dispatch messages, call
144   // ProcessMessages occasionally.
145   virtual void Run();
146 
147   virtual void Send(MessageHandler* phandler,
148                     uint32_t id = 0,
149                     MessageData* pdata = NULL);
150 
151   // Convenience method to invoke a functor on another thread.  Caller must
152   // provide the |ReturnT| template argument, which cannot (easily) be deduced.
153   // Uses Send() internally, which blocks the current thread until execution
154   // is complete.
155   // Ex: bool result = thread.Invoke<bool>(&MyFunctionReturningBool);
156   // NOTE: This function can only be called when synchronous calls are allowed.
157   // See ScopedDisallowBlockingCalls for details.
158   template <class ReturnT, class FunctorT>
Invoke(const FunctorT & functor)159   ReturnT Invoke(const FunctorT& functor) {
160     InvokeBegin();
161     FunctorMessageHandler<ReturnT, FunctorT> handler(functor);
162     Send(&handler);
163     InvokeEnd();
164     return handler.result();
165   }
166 
167   // From MessageQueue
168   void Clear(MessageHandler* phandler,
169              uint32_t id = MQID_ANY,
170              MessageList* removed = NULL) override;
171   void ReceiveSends() override;
172 
173   // ProcessMessages will process I/O and dispatch messages until:
174   //  1) cms milliseconds have elapsed (returns true)
175   //  2) Stop() is called (returns false)
176   bool ProcessMessages(int cms);
177 
178   // Returns true if this is a thread that we created using the standard
179   // constructor, false if it was created by a call to
180   // ThreadManager::WrapCurrentThread().  The main thread of an application
181   // is generally not owned, since the OS representation of the thread
182   // obviously exists before we can get to it.
183   // You cannot call Start on non-owned threads.
184   bool IsOwned();
185 
186 #if defined(WEBRTC_WIN)
GetHandle()187   HANDLE GetHandle() const {
188     return thread_;
189   }
GetId()190   DWORD GetId() const {
191     return thread_id_;
192   }
193 #elif defined(WEBRTC_POSIX)
GetPThread()194   pthread_t GetPThread() {
195     return thread_;
196   }
197 #endif
198 
199   // Expose private method running() for tests.
200   //
201   // DANGER: this is a terrible public API.  Most callers that might want to
202   // call this likely do not have enough control/knowledge of the Thread in
203   // question to guarantee that the returned value remains true for the duration
204   // of whatever code is conditionally executing because of the return value!
RunningForTest()205   bool RunningForTest() { return running(); }
206 
207   // Sets the per-thread allow-blocking-calls flag and returns the previous
208   // value. Must be called on this thread.
209   bool SetAllowBlockingCalls(bool allow);
210 
211   // These functions are public to avoid injecting test hooks. Don't call them
212   // outside of tests.
213   // This method should be called when thread is created using non standard
214   // method, like derived implementation of rtc::Thread and it can not be
215   // started by calling Start(). This will set started flag to true and
216   // owned to false. This must be called from the current thread.
217   bool WrapCurrent();
218   void UnwrapCurrent();
219 
220  protected:
221   // Same as WrapCurrent except that it never fails as it does not try to
222   // acquire the synchronization access of the thread. The caller should never
223   // call Stop() or Join() on this thread.
224   void SafeWrapCurrent();
225 
226   // Blocks the calling thread until this thread has terminated.
227   void Join();
228 
229   static void AssertBlockingIsAllowedOnCurrentThread();
230 
231   friend class ScopedDisallowBlockingCalls;
232 
233  private:
234   static void *PreRun(void *pv);
235 
236   // ThreadManager calls this instead WrapCurrent() because
237   // ThreadManager::Instance() cannot be used while ThreadManager is
238   // being created.
239   // The method tries to get synchronization rights of the thread on Windows if
240   // |need_synchronize_access| is true.
241   bool WrapCurrentWithThreadManager(ThreadManager* thread_manager,
242                                     bool need_synchronize_access);
243 
244   // Return true if the thread was started and hasn't yet stopped.
running()245   bool running() { return running_.Wait(0); }
246 
247   // Processes received "Send" requests. If |source| is not NULL, only requests
248   // from |source| are processed, otherwise, all requests are processed.
249   void ReceiveSendsFromThread(const Thread* source);
250 
251   // If |source| is not NULL, pops the first "Send" message from |source| in
252   // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
253   // The caller must lock |crit_| before calling.
254   // Returns true if there is such a message.
255   bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
256 
257   // Used for tracking performance of Invoke calls.
258   void InvokeBegin();
259   void InvokeEnd();
260 
261   std::list<_SendMessage> sendlist_;
262   std::string name_;
263   Event running_;  // Signalled means running.
264 
265 #if defined(WEBRTC_POSIX)
266   pthread_t thread_;
267 #endif
268 
269 #if defined(WEBRTC_WIN)
270   HANDLE thread_;
271   DWORD thread_id_;
272 #endif
273 
274   bool owned_;
275   bool blocking_calls_allowed_;  // By default set to |true|.
276 
277   friend class ThreadManager;
278 
279   RTC_DISALLOW_COPY_AND_ASSIGN(Thread);
280 };
281 
282 // AutoThread automatically installs itself at construction
283 // uninstalls at destruction, if a Thread object is
284 // _not already_ associated with the current OS thread.
285 
286 class AutoThread : public Thread {
287  public:
288   explicit AutoThread(SocketServer* ss = 0);
289   ~AutoThread() override;
290 
291  private:
292   RTC_DISALLOW_COPY_AND_ASSIGN(AutoThread);
293 };
294 
295 // Win32 extension for threads that need to use COM
296 #if defined(WEBRTC_WIN)
297 class ComThread : public Thread {
298  public:
ComThread()299   ComThread() {}
~ComThread()300   virtual ~ComThread() { Stop(); }
301 
302  protected:
303   virtual void Run();
304 
305  private:
306   RTC_DISALLOW_COPY_AND_ASSIGN(ComThread);
307 };
308 #endif
309 
310 // Provides an easy way to install/uninstall a socketserver on a thread.
311 class SocketServerScope {
312  public:
SocketServerScope(SocketServer * ss)313   explicit SocketServerScope(SocketServer* ss) {
314     old_ss_ = Thread::Current()->socketserver();
315     Thread::Current()->set_socketserver(ss);
316   }
~SocketServerScope()317   ~SocketServerScope() {
318     Thread::Current()->set_socketserver(old_ss_);
319   }
320 
321  private:
322   SocketServer* old_ss_;
323 
324   RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(SocketServerScope);
325 };
326 
327 }  // namespace rtc
328 
329 #endif  // WEBRTC_BASE_THREAD_H_
330