1 // Copyright 2015 The Weave Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "src/commands/command_queue.h"
6
7 #include <base/bind.h>
8 #include <base/time/time.h>
9
10 namespace weave {
11
12 namespace {
13 const int kRemoveCommandDelayMin = 5;
14
GetCommandHandlerKey(const std::string & component_path,const std::string & command_name)15 std::string GetCommandHandlerKey(const std::string& component_path,
16 const std::string& command_name) {
17 return component_path + ":" + command_name;
18 }
19 }
20
CommandQueue(provider::TaskRunner * task_runner,base::Clock * clock)21 CommandQueue::CommandQueue(provider::TaskRunner* task_runner,
22 base::Clock* clock)
23 : task_runner_{task_runner}, clock_{clock} {}
24
AddCommandAddedCallback(const CommandCallback & callback)25 void CommandQueue::AddCommandAddedCallback(const CommandCallback& callback) {
26 on_command_added_.push_back(callback);
27 // Send all pre-existed commands.
28 for (const auto& command : map_)
29 callback.Run(command.second.get());
30 }
31
AddCommandRemovedCallback(const CommandCallback & callback)32 void CommandQueue::AddCommandRemovedCallback(const CommandCallback& callback) {
33 on_command_removed_.push_back(callback);
34 }
35
AddCommandHandler(const std::string & component_path,const std::string & command_name,const Device::CommandHandlerCallback & callback)36 void CommandQueue::AddCommandHandler(
37 const std::string& component_path,
38 const std::string& command_name,
39 const Device::CommandHandlerCallback& callback) {
40 if (!command_name.empty()) {
41 CHECK(default_command_callback_.is_null())
42 << "Commands specific handler are not allowed after default one";
43
44 for (const auto& command : map_) {
45 if (command.second->GetState() == Command::State::kQueued &&
46 command.second->GetName() == command_name &&
47 command.second->GetComponent() == component_path) {
48 callback.Run(command.second);
49 }
50 }
51
52 std::string key = GetCommandHandlerKey(component_path, command_name);
53 CHECK(command_callbacks_.insert(std::make_pair(key, callback)).second)
54 << command_name << " already has handler";
55
56 } else {
57 CHECK(component_path.empty())
58 << "Default handler must not be component-specific";
59 for (const auto& command : map_) {
60 std::string key = GetCommandHandlerKey(command.second->GetComponent(),
61 command.second->GetName());
62 if (command.second->GetState() == Command::State::kQueued &&
63 command_callbacks_.find(key) == command_callbacks_.end()) {
64 callback.Run(command.second);
65 }
66 }
67
68 CHECK(default_command_callback_.is_null()) << "Already has default handler";
69 default_command_callback_ = callback;
70 }
71 }
72
Add(std::unique_ptr<CommandInstance> instance)73 void CommandQueue::Add(std::unique_ptr<CommandInstance> instance) {
74 std::string id = instance->GetID();
75 LOG_IF(FATAL, id.empty()) << "Command has no ID";
76 instance->AttachToQueue(this);
77 auto pair = map_.insert(std::make_pair(id, std::move(instance)));
78 LOG_IF(FATAL, !pair.second) << "Command with ID '" << id
79 << "' is already in the queue";
80 for (const auto& cb : on_command_added_)
81 cb.Run(pair.first->second.get());
82
83 std::string key = GetCommandHandlerKey(pair.first->second->GetComponent(),
84 pair.first->second->GetName());
85 auto it_handler = command_callbacks_.find(key);
86
87 if (it_handler != command_callbacks_.end())
88 it_handler->second.Run(pair.first->second);
89 else if (!default_command_callback_.is_null())
90 default_command_callback_.Run(pair.first->second);
91 }
92
RemoveLater(const std::string & id)93 void CommandQueue::RemoveLater(const std::string& id) {
94 auto p = map_.find(id);
95 if (p == map_.end())
96 return;
97 auto remove_delay = base::TimeDelta::FromMinutes(kRemoveCommandDelayMin);
98 remove_queue_.push(std::make_pair(clock_->Now() + remove_delay, id));
99 if (remove_queue_.size() == 1) {
100 // The queue was empty, this is the first command to be removed, schedule
101 // a clean-up task.
102 ScheduleCleanup(remove_delay);
103 }
104 }
105
Remove(const std::string & id)106 bool CommandQueue::Remove(const std::string& id) {
107 auto p = map_.find(id);
108 if (p == map_.end())
109 return false;
110 std::shared_ptr<CommandInstance> instance = p->second;
111 instance->DetachFromQueue();
112 map_.erase(p);
113 for (const auto& cb : on_command_removed_)
114 cb.Run(instance.get());
115 return true;
116 }
117
Cleanup(const base::Time & cutoff_time)118 void CommandQueue::Cleanup(const base::Time& cutoff_time) {
119 while (!remove_queue_.empty() && remove_queue_.top().first <= cutoff_time) {
120 Remove(remove_queue_.top().second);
121 remove_queue_.pop();
122 }
123 }
124
ScheduleCleanup(base::TimeDelta delay)125 void CommandQueue::ScheduleCleanup(base::TimeDelta delay) {
126 task_runner_->PostDelayedTask(
127 FROM_HERE,
128 base::Bind(&CommandQueue::PerformScheduledCleanup,
129 weak_ptr_factory_.GetWeakPtr()),
130 delay);
131 }
132
PerformScheduledCleanup()133 void CommandQueue::PerformScheduledCleanup() {
134 base::Time now = clock_->Now();
135 Cleanup(now);
136 if (!remove_queue_.empty())
137 ScheduleCleanup(remove_queue_.top().first - now);
138 }
139
Find(const std::string & id) const140 CommandInstance* CommandQueue::Find(const std::string& id) const {
141 auto p = map_.find(id);
142 return (p != map_.end()) ? p->second.get() : nullptr;
143 }
144
145 } // namespace weave
146