1 // Copyright 2015 The Weave Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "src/commands/command_queue.h"
6 
7 #include <base/bind.h>
8 #include <base/time/time.h>
9 
10 namespace weave {
11 
12 namespace {
13 const int kRemoveCommandDelayMin = 5;
14 
GetCommandHandlerKey(const std::string & component_path,const std::string & command_name)15 std::string GetCommandHandlerKey(const std::string& component_path,
16                                  const std::string& command_name) {
17   return component_path + ":" + command_name;
18 }
19 }
20 
CommandQueue(provider::TaskRunner * task_runner,base::Clock * clock)21 CommandQueue::CommandQueue(provider::TaskRunner* task_runner,
22                            base::Clock* clock)
23     : task_runner_{task_runner}, clock_{clock} {}
24 
AddCommandAddedCallback(const CommandCallback & callback)25 void CommandQueue::AddCommandAddedCallback(const CommandCallback& callback) {
26   on_command_added_.push_back(callback);
27   // Send all pre-existed commands.
28   for (const auto& command : map_)
29     callback.Run(command.second.get());
30 }
31 
AddCommandRemovedCallback(const CommandCallback & callback)32 void CommandQueue::AddCommandRemovedCallback(const CommandCallback& callback) {
33   on_command_removed_.push_back(callback);
34 }
35 
AddCommandHandler(const std::string & component_path,const std::string & command_name,const Device::CommandHandlerCallback & callback)36 void CommandQueue::AddCommandHandler(
37     const std::string& component_path,
38     const std::string& command_name,
39     const Device::CommandHandlerCallback& callback) {
40   if (!command_name.empty()) {
41     CHECK(default_command_callback_.is_null())
42         << "Commands specific handler are not allowed after default one";
43 
44     for (const auto& command : map_) {
45       if (command.second->GetState() == Command::State::kQueued &&
46           command.second->GetName() == command_name &&
47           command.second->GetComponent() == component_path) {
48         callback.Run(command.second);
49       }
50     }
51 
52     std::string key = GetCommandHandlerKey(component_path, command_name);
53     CHECK(command_callbacks_.insert(std::make_pair(key, callback)).second)
54         << command_name << " already has handler";
55 
56   } else {
57     CHECK(component_path.empty())
58         << "Default handler must not be component-specific";
59     for (const auto& command : map_) {
60       std::string key = GetCommandHandlerKey(command.second->GetComponent(),
61                                              command.second->GetName());
62       if (command.second->GetState() == Command::State::kQueued &&
63           command_callbacks_.find(key) == command_callbacks_.end()) {
64         callback.Run(command.second);
65       }
66     }
67 
68     CHECK(default_command_callback_.is_null()) << "Already has default handler";
69     default_command_callback_ = callback;
70   }
71 }
72 
Add(std::unique_ptr<CommandInstance> instance)73 void CommandQueue::Add(std::unique_ptr<CommandInstance> instance) {
74   std::string id = instance->GetID();
75   LOG_IF(FATAL, id.empty()) << "Command has no ID";
76   instance->AttachToQueue(this);
77   auto pair = map_.insert(std::make_pair(id, std::move(instance)));
78   LOG_IF(FATAL, !pair.second) << "Command with ID '" << id
79                               << "' is already in the queue";
80   for (const auto& cb : on_command_added_)
81     cb.Run(pair.first->second.get());
82 
83   std::string key = GetCommandHandlerKey(pair.first->second->GetComponent(),
84                                          pair.first->second->GetName());
85   auto it_handler = command_callbacks_.find(key);
86 
87   if (it_handler != command_callbacks_.end())
88     it_handler->second.Run(pair.first->second);
89   else if (!default_command_callback_.is_null())
90     default_command_callback_.Run(pair.first->second);
91 }
92 
RemoveLater(const std::string & id)93 void CommandQueue::RemoveLater(const std::string& id) {
94   auto p = map_.find(id);
95   if (p == map_.end())
96     return;
97   auto remove_delay = base::TimeDelta::FromMinutes(kRemoveCommandDelayMin);
98   remove_queue_.push(std::make_pair(clock_->Now() + remove_delay, id));
99   if (remove_queue_.size() == 1) {
100     // The queue was empty, this is the first command to be removed, schedule
101     // a clean-up task.
102     ScheduleCleanup(remove_delay);
103   }
104 }
105 
Remove(const std::string & id)106 bool CommandQueue::Remove(const std::string& id) {
107   auto p = map_.find(id);
108   if (p == map_.end())
109     return false;
110   std::shared_ptr<CommandInstance> instance = p->second;
111   instance->DetachFromQueue();
112   map_.erase(p);
113   for (const auto& cb : on_command_removed_)
114     cb.Run(instance.get());
115   return true;
116 }
117 
Cleanup(const base::Time & cutoff_time)118 void CommandQueue::Cleanup(const base::Time& cutoff_time) {
119   while (!remove_queue_.empty() && remove_queue_.top().first <= cutoff_time) {
120     Remove(remove_queue_.top().second);
121     remove_queue_.pop();
122   }
123 }
124 
ScheduleCleanup(base::TimeDelta delay)125 void CommandQueue::ScheduleCleanup(base::TimeDelta delay) {
126   task_runner_->PostDelayedTask(
127       FROM_HERE,
128       base::Bind(&CommandQueue::PerformScheduledCleanup,
129                  weak_ptr_factory_.GetWeakPtr()),
130       delay);
131 }
132 
PerformScheduledCleanup()133 void CommandQueue::PerformScheduledCleanup() {
134   base::Time now = clock_->Now();
135   Cleanup(now);
136   if (!remove_queue_.empty())
137     ScheduleCleanup(remove_queue_.top().first - now);
138 }
139 
Find(const std::string & id) const140 CommandInstance* CommandQueue::Find(const std::string& id) const {
141   auto p = map_.find(id);
142   return (p != map_.end()) ? p->second.get() : nullptr;
143 }
144 
145 }  // namespace weave
146