1 /*
2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #if defined(WEBRTC_POSIX)
12 #include <sys/time.h>
13 #endif
14 
15 #include <algorithm>
16 
17 #include "webrtc/base/common.h"
18 #include "webrtc/base/logging.h"
19 #include "webrtc/base/messagequeue.h"
20 #if defined(__native_client__)
21 #include "webrtc/base/nullsocketserver.h"
22 typedef rtc::NullSocketServer DefaultSocketServer;
23 #else
24 #include "webrtc/base/physicalsocketserver.h"
25 typedef rtc::PhysicalSocketServer DefaultSocketServer;
26 #endif
27 
28 namespace rtc {
29 
30 const uint32_t kMaxMsgLatency = 150;  // 150 ms
31 
32 //------------------------------------------------------------------
33 // MessageQueueManager
34 
35 MessageQueueManager* MessageQueueManager::instance_ = NULL;
36 
Instance()37 MessageQueueManager* MessageQueueManager::Instance() {
38   // Note: This is not thread safe, but it is first called before threads are
39   // spawned.
40   if (!instance_)
41     instance_ = new MessageQueueManager;
42   return instance_;
43 }
44 
IsInitialized()45 bool MessageQueueManager::IsInitialized() {
46   return instance_ != NULL;
47 }
48 
MessageQueueManager()49 MessageQueueManager::MessageQueueManager() {
50 }
51 
~MessageQueueManager()52 MessageQueueManager::~MessageQueueManager() {
53 }
54 
Add(MessageQueue * message_queue)55 void MessageQueueManager::Add(MessageQueue *message_queue) {
56   return Instance()->AddInternal(message_queue);
57 }
AddInternal(MessageQueue * message_queue)58 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
59   // MessageQueueManager methods should be non-reentrant, so we
60   // ASSERT that is the case.  If any of these ASSERT, please
61   // contact bpm or jbeda.
62 #if CS_DEBUG_CHECKS  // CurrentThreadIsOwner returns true by default.
63   ASSERT(!crit_.CurrentThreadIsOwner());
64 #endif
65   CritScope cs(&crit_);
66   message_queues_.push_back(message_queue);
67 }
68 
Remove(MessageQueue * message_queue)69 void MessageQueueManager::Remove(MessageQueue *message_queue) {
70   // If there isn't a message queue manager instance, then there isn't a queue
71   // to remove.
72   if (!instance_) return;
73   return Instance()->RemoveInternal(message_queue);
74 }
RemoveInternal(MessageQueue * message_queue)75 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
76 #if CS_DEBUG_CHECKS  // CurrentThreadIsOwner returns true by default.
77   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
78 #endif
79   // If this is the last MessageQueue, destroy the manager as well so that
80   // we don't leak this object at program shutdown. As mentioned above, this is
81   // not thread-safe, but this should only happen at program termination (when
82   // the ThreadManager is destroyed, and threads are no longer active).
83   bool destroy = false;
84   {
85     CritScope cs(&crit_);
86     std::vector<MessageQueue *>::iterator iter;
87     iter = std::find(message_queues_.begin(), message_queues_.end(),
88                      message_queue);
89     if (iter != message_queues_.end()) {
90       message_queues_.erase(iter);
91     }
92     destroy = message_queues_.empty();
93   }
94   if (destroy) {
95     instance_ = NULL;
96     delete this;
97   }
98 }
99 
Clear(MessageHandler * handler)100 void MessageQueueManager::Clear(MessageHandler *handler) {
101   // If there isn't a message queue manager instance, then there aren't any
102   // queues to remove this handler from.
103   if (!instance_) return;
104   return Instance()->ClearInternal(handler);
105 }
ClearInternal(MessageHandler * handler)106 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
107 #if CS_DEBUG_CHECKS  // CurrentThreadIsOwner returns true by default.
108   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
109 #endif
110   CritScope cs(&crit_);
111   std::vector<MessageQueue *>::iterator iter;
112   for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
113     (*iter)->Clear(handler);
114 }
115 
116 //------------------------------------------------------------------
117 // MessageQueue
118 
MessageQueue(SocketServer * ss)119 MessageQueue::MessageQueue(SocketServer* ss)
120     : ss_(ss), fStop_(false), fPeekKeep_(false),
121       dmsgq_next_num_(0) {
122   if (!ss_) {
123     // Currently, MessageQueue holds a socket server, and is the base class for
124     // Thread.  It seems like it makes more sense for Thread to hold the socket
125     // server, and provide it to the MessageQueue, since the Thread controls
126     // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
127     // messagequeue_unittest to depend on network libraries... yuck.
128     default_ss_.reset(new DefaultSocketServer());
129     ss_ = default_ss_.get();
130   }
131   ss_->SetMessageQueue(this);
132   MessageQueueManager::Add(this);
133 }
134 
~MessageQueue()135 MessageQueue::~MessageQueue() {
136   // The signal is done from here to ensure
137   // that it always gets called when the queue
138   // is going away.
139   SignalQueueDestroyed();
140   MessageQueueManager::Remove(this);
141   Clear(NULL);
142   if (ss_) {
143     ss_->SetMessageQueue(NULL);
144   }
145 }
146 
set_socketserver(SocketServer * ss)147 void MessageQueue::set_socketserver(SocketServer* ss) {
148   ss_ = ss ? ss : default_ss_.get();
149   ss_->SetMessageQueue(this);
150 }
151 
Quit()152 void MessageQueue::Quit() {
153   fStop_ = true;
154   ss_->WakeUp();
155 }
156 
IsQuitting()157 bool MessageQueue::IsQuitting() {
158   return fStop_;
159 }
160 
Restart()161 void MessageQueue::Restart() {
162   fStop_ = false;
163 }
164 
Peek(Message * pmsg,int cmsWait)165 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
166   if (fPeekKeep_) {
167     *pmsg = msgPeek_;
168     return true;
169   }
170   if (!Get(pmsg, cmsWait))
171     return false;
172   msgPeek_ = *pmsg;
173   fPeekKeep_ = true;
174   return true;
175 }
176 
Get(Message * pmsg,int cmsWait,bool process_io)177 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
178   // Return and clear peek if present
179   // Always return the peek if it exists so there is Peek/Get symmetry
180 
181   if (fPeekKeep_) {
182     *pmsg = msgPeek_;
183     fPeekKeep_ = false;
184     return true;
185   }
186 
187   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
188 
189   int cmsTotal = cmsWait;
190   int cmsElapsed = 0;
191   uint32_t msStart = Time();
192   uint32_t msCurrent = msStart;
193   while (true) {
194     // Check for sent messages
195     ReceiveSends();
196 
197     // Check for posted events
198     int cmsDelayNext = kForever;
199     bool first_pass = true;
200     while (true) {
201       // All queue operations need to be locked, but nothing else in this loop
202       // (specifically handling disposed message) can happen inside the crit.
203       // Otherwise, disposed MessageHandlers will cause deadlocks.
204       {
205         CritScope cs(&crit_);
206         // On the first pass, check for delayed messages that have been
207         // triggered and calculate the next trigger time.
208         if (first_pass) {
209           first_pass = false;
210           while (!dmsgq_.empty()) {
211             if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
212               cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
213               break;
214             }
215             msgq_.push_back(dmsgq_.top().msg_);
216             dmsgq_.pop();
217           }
218         }
219         // Pull a message off the message queue, if available.
220         if (msgq_.empty()) {
221           break;
222         } else {
223           *pmsg = msgq_.front();
224           msgq_.pop_front();
225         }
226       }  // crit_ is released here.
227 
228       // Log a warning for time-sensitive messages that we're late to deliver.
229       if (pmsg->ts_sensitive) {
230         int32_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
231         if (delay > 0) {
232           LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
233                             << (delay + kMaxMsgLatency) << "ms";
234         }
235       }
236       // If this was a dispose message, delete it and skip it.
237       if (MQID_DISPOSE == pmsg->message_id) {
238         ASSERT(NULL == pmsg->phandler);
239         delete pmsg->pdata;
240         *pmsg = Message();
241         continue;
242       }
243       return true;
244     }
245 
246     if (fStop_)
247       break;
248 
249     // Which is shorter, the delay wait or the asked wait?
250 
251     int cmsNext;
252     if (cmsWait == kForever) {
253       cmsNext = cmsDelayNext;
254     } else {
255       cmsNext = std::max(0, cmsTotal - cmsElapsed);
256       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
257         cmsNext = cmsDelayNext;
258     }
259 
260     // Wait and multiplex in the meantime
261     if (!ss_->Wait(cmsNext, process_io))
262       return false;
263 
264     // If the specified timeout expired, return
265 
266     msCurrent = Time();
267     cmsElapsed = TimeDiff(msCurrent, msStart);
268     if (cmsWait != kForever) {
269       if (cmsElapsed >= cmsWait)
270         return false;
271     }
272   }
273   return false;
274 }
275 
ReceiveSends()276 void MessageQueue::ReceiveSends() {
277 }
278 
Post(MessageHandler * phandler,uint32_t id,MessageData * pdata,bool time_sensitive)279 void MessageQueue::Post(MessageHandler* phandler,
280                         uint32_t id,
281                         MessageData* pdata,
282                         bool time_sensitive) {
283   if (fStop_)
284     return;
285 
286   // Keep thread safe
287   // Add the message to the end of the queue
288   // Signal for the multiplexer to return
289 
290   CritScope cs(&crit_);
291   Message msg;
292   msg.phandler = phandler;
293   msg.message_id = id;
294   msg.pdata = pdata;
295   if (time_sensitive) {
296     msg.ts_sensitive = Time() + kMaxMsgLatency;
297   }
298   msgq_.push_back(msg);
299   ss_->WakeUp();
300 }
301 
PostDelayed(int cmsDelay,MessageHandler * phandler,uint32_t id,MessageData * pdata)302 void MessageQueue::PostDelayed(int cmsDelay,
303                                MessageHandler* phandler,
304                                uint32_t id,
305                                MessageData* pdata) {
306   return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
307 }
308 
PostAt(uint32_t tstamp,MessageHandler * phandler,uint32_t id,MessageData * pdata)309 void MessageQueue::PostAt(uint32_t tstamp,
310                           MessageHandler* phandler,
311                           uint32_t id,
312                           MessageData* pdata) {
313   return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
314 }
315 
DoDelayPost(int cmsDelay,uint32_t tstamp,MessageHandler * phandler,uint32_t id,MessageData * pdata)316 void MessageQueue::DoDelayPost(int cmsDelay,
317                                uint32_t tstamp,
318                                MessageHandler* phandler,
319                                uint32_t id,
320                                MessageData* pdata) {
321   if (fStop_)
322     return;
323 
324   // Keep thread safe
325   // Add to the priority queue. Gets sorted soonest first.
326   // Signal for the multiplexer to return.
327 
328   CritScope cs(&crit_);
329   Message msg;
330   msg.phandler = phandler;
331   msg.message_id = id;
332   msg.pdata = pdata;
333   DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
334   dmsgq_.push(dmsg);
335   // If this message queue processes 1 message every millisecond for 50 days,
336   // we will wrap this number.  Even then, only messages with identical times
337   // will be misordered, and then only briefly.  This is probably ok.
338   VERIFY(0 != ++dmsgq_next_num_);
339   ss_->WakeUp();
340 }
341 
GetDelay()342 int MessageQueue::GetDelay() {
343   CritScope cs(&crit_);
344 
345   if (!msgq_.empty())
346     return 0;
347 
348   if (!dmsgq_.empty()) {
349     int delay = TimeUntil(dmsgq_.top().msTrigger_);
350     if (delay < 0)
351       delay = 0;
352     return delay;
353   }
354 
355   return kForever;
356 }
357 
Clear(MessageHandler * phandler,uint32_t id,MessageList * removed)358 void MessageQueue::Clear(MessageHandler* phandler,
359                          uint32_t id,
360                          MessageList* removed) {
361   CritScope cs(&crit_);
362 
363   // Remove messages with phandler
364 
365   if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
366     if (removed) {
367       removed->push_back(msgPeek_);
368     } else {
369       delete msgPeek_.pdata;
370     }
371     fPeekKeep_ = false;
372   }
373 
374   // Remove from ordered message queue
375 
376   for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
377     if (it->Match(phandler, id)) {
378       if (removed) {
379         removed->push_back(*it);
380       } else {
381         delete it->pdata;
382       }
383       it = msgq_.erase(it);
384     } else {
385       ++it;
386     }
387   }
388 
389   // Remove from priority queue. Not directly iterable, so use this approach
390 
391   PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
392   for (PriorityQueue::container_type::iterator it = new_end;
393        it != dmsgq_.container().end(); ++it) {
394     if (it->msg_.Match(phandler, id)) {
395       if (removed) {
396         removed->push_back(it->msg_);
397       } else {
398         delete it->msg_.pdata;
399       }
400     } else {
401       *new_end++ = *it;
402     }
403   }
404   dmsgq_.container().erase(new_end, dmsgq_.container().end());
405   dmsgq_.reheap();
406 }
407 
Dispatch(Message * pmsg)408 void MessageQueue::Dispatch(Message *pmsg) {
409   pmsg->phandler->OnMessage(pmsg);
410 }
411 
412 }  // namespace rtc
413