1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/ports/message_queue.h"
6 
7 #include <algorithm>
8 
9 #include "base/logging.h"
10 #include "mojo/core/ports/message_filter.h"
11 
12 namespace mojo {
13 namespace core {
14 namespace ports {
15 
16 // Used by std::{push,pop}_heap functions
operator <(const std::unique_ptr<UserMessageEvent> & a,const std::unique_ptr<UserMessageEvent> & b)17 inline bool operator<(const std::unique_ptr<UserMessageEvent>& a,
18                       const std::unique_ptr<UserMessageEvent>& b) {
19   return a->sequence_num() > b->sequence_num();
20 }
21 
MessageQueue()22 MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {}
23 
MessageQueue(uint64_t next_sequence_num)24 MessageQueue::MessageQueue(uint64_t next_sequence_num)
25     : next_sequence_num_(next_sequence_num) {
26   // The message queue is blocked waiting for a message with sequence number
27   // equal to |next_sequence_num|.
28 }
29 
~MessageQueue()30 MessageQueue::~MessageQueue() {
31 #if DCHECK_IS_ON()
32   size_t num_leaked_ports = 0;
33   for (const auto& message : heap_)
34     num_leaked_ports += message->num_ports();
35   DVLOG_IF(1, num_leaked_ports > 0)
36       << "Leaking " << num_leaked_ports << " ports in unreceived messages";
37 #endif
38 }
39 
HasNextMessage() const40 bool MessageQueue::HasNextMessage() const {
41   return !heap_.empty() && heap_[0]->sequence_num() == next_sequence_num_;
42 }
43 
GetNextMessage(std::unique_ptr<UserMessageEvent> * message,MessageFilter * filter)44 void MessageQueue::GetNextMessage(std::unique_ptr<UserMessageEvent>* message,
45                                   MessageFilter* filter) {
46   if (!HasNextMessage() || (filter && !filter->Match(*heap_[0]))) {
47     message->reset();
48     return;
49   }
50 
51   std::pop_heap(heap_.begin(), heap_.end());
52   *message = std::move(heap_.back());
53   total_queued_bytes_ -= (*message)->GetSizeIfSerialized();
54   heap_.pop_back();
55 
56   next_sequence_num_++;
57 }
58 
AcceptMessage(std::unique_ptr<UserMessageEvent> message,bool * has_next_message)59 void MessageQueue::AcceptMessage(std::unique_ptr<UserMessageEvent> message,
60                                  bool* has_next_message) {
61   // TODO: Handle sequence number roll-over.
62 
63   total_queued_bytes_ += message->GetSizeIfSerialized();
64   heap_.emplace_back(std::move(message));
65   std::push_heap(heap_.begin(), heap_.end());
66 
67   if (!signalable_) {
68     *has_next_message = false;
69   } else {
70     *has_next_message = (heap_[0]->sequence_num() == next_sequence_num_);
71   }
72 }
73 
TakeAllMessages(std::vector<std::unique_ptr<UserMessageEvent>> * messages)74 void MessageQueue::TakeAllMessages(
75     std::vector<std::unique_ptr<UserMessageEvent>>* messages) {
76   *messages = std::move(heap_);
77   total_queued_bytes_ = 0;
78 }
79 
80 }  // namespace ports
81 }  // namespace core
82 }  // namespace mojo
83