1 // Copyright 2015 The Weave Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "src/commands/cloud_command_proxy.h"
6 
7 #include <base/bind.h>
8 #include <weave/enum_to_string.h>
9 #include <weave/provider/task_runner.h>
10 
11 #include "src/commands/command_instance.h"
12 #include "src/commands/schema_constants.h"
13 #include "src/utils.h"
14 
15 namespace weave {
16 
CloudCommandProxy(CommandInstance * command_instance,CloudCommandUpdateInterface * cloud_command_updater,ComponentManager * component_manager,std::unique_ptr<BackoffEntry> backoff_entry,provider::TaskRunner * task_runner)17 CloudCommandProxy::CloudCommandProxy(
18     CommandInstance* command_instance,
19     CloudCommandUpdateInterface* cloud_command_updater,
20     ComponentManager* component_manager,
21     std::unique_ptr<BackoffEntry> backoff_entry,
22     provider::TaskRunner* task_runner)
23     : command_instance_{command_instance},
24       cloud_command_updater_{cloud_command_updater},
25       component_manager_{component_manager},
26       task_runner_{task_runner},
27       cloud_backoff_entry_{std::move(backoff_entry)} {
28   callback_token_ = component_manager_->AddServerStateUpdatedCallback(
29       base::Bind(&CloudCommandProxy::OnDeviceStateUpdated,
30                  weak_ptr_factory_.GetWeakPtr()));
31   observer_.Add(command_instance);
32 }
33 
OnErrorChanged()34 void CloudCommandProxy::OnErrorChanged() {
35   std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
36   patch->Set(commands::attributes::kCommand_Error,
37              command_instance_->GetError()
38                  ? ErrorInfoToJson(*command_instance_->GetError()).release()
39                  : base::Value::CreateNullValue().release());
40   QueueCommandUpdate(std::move(patch));
41 }
42 
OnResultsChanged()43 void CloudCommandProxy::OnResultsChanged() {
44   std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
45   patch->Set(commands::attributes::kCommand_Results,
46              command_instance_->GetResults().CreateDeepCopy());
47   QueueCommandUpdate(std::move(patch));
48 }
49 
OnStateChanged()50 void CloudCommandProxy::OnStateChanged() {
51   std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
52   patch->SetString(commands::attributes::kCommand_State,
53                    EnumToString(command_instance_->GetState()));
54   QueueCommandUpdate(std::move(patch));
55 }
56 
OnProgressChanged()57 void CloudCommandProxy::OnProgressChanged() {
58   std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
59   patch->Set(commands::attributes::kCommand_Progress,
60              command_instance_->GetProgress().CreateDeepCopy());
61   QueueCommandUpdate(std::move(patch));
62 }
63 
OnCommandDestroyed()64 void CloudCommandProxy::OnCommandDestroyed() {
65   delete this;
66 }
67 
QueueCommandUpdate(std::unique_ptr<base::DictionaryValue> patch)68 void CloudCommandProxy::QueueCommandUpdate(
69     std::unique_ptr<base::DictionaryValue> patch) {
70   ComponentManager::UpdateID id = component_manager_->GetLastStateChangeId();
71   if (update_queue_.empty() || update_queue_.back().first != id) {
72     // If queue is currently empty or the device state has changed since the
73     // last patch request queued, add a new request to the queue.
74     update_queue_.push_back(std::make_pair(id, std::move(patch)));
75   } else {
76     // Device state hasn't changed since the last time this command update
77     // was queued. We can coalesce the command update patches, unless the
78     // current request is already in flight to the server.
79     if (update_queue_.size() == 1 && command_update_in_progress_) {
80       // Can't update the request which is being sent to the server.
81       // Queue a new update.
82       update_queue_.push_back(std::make_pair(id, std::move(patch)));
83     } else {
84       // Coalesce the patches.
85       update_queue_.back().second->MergeDictionary(patch.get());
86     }
87   }
88   // Send out an update request to the server, if needed.
89 
90   // Post to accumulate more changes during the current message loop task run.
91   task_runner_->PostDelayedTask(
92       FROM_HERE, base::Bind(&CloudCommandProxy::SendCommandUpdate,
93                             backoff_weak_ptr_factory_.GetWeakPtr()),
94       {});
95 }
96 
SendCommandUpdate()97 void CloudCommandProxy::SendCommandUpdate() {
98   if (command_update_in_progress_ || update_queue_.empty())
99     return;
100 
101   // Check if we have any pending updates ready to be sent to the server.
102   // We can only send updates for which the device state at the time the
103   // requests have been queued were successfully propagated to the server.
104   // That is, if the pending device state updates that we recorded while the
105   // command update was queued haven't been acknowledged by the server, we
106   // will hold the corresponding command updates until the related device state
107   // has been successfully updated on the server.
108   if (update_queue_.front().first > last_state_update_id_)
109     return;
110 
111   backoff_weak_ptr_factory_.InvalidateWeakPtrs();
112   if (cloud_backoff_entry_->ShouldRejectRequest()) {
113     VLOG(1) << "Cloud request delayed for "
114             << cloud_backoff_entry_->GetTimeUntilRelease()
115             << " due to backoff policy";
116     task_runner_->PostDelayedTask(
117         FROM_HERE, base::Bind(&CloudCommandProxy::SendCommandUpdate,
118                               backoff_weak_ptr_factory_.GetWeakPtr()),
119         cloud_backoff_entry_->GetTimeUntilRelease());
120     return;
121   }
122 
123   // Coalesce any pending updates that were queued prior to the current device
124   // state known to be propagated to the server successfully.
125   auto iter = update_queue_.begin();
126   auto start = ++iter;
127   while (iter != update_queue_.end()) {
128     if (iter->first > last_state_update_id_)
129       break;
130     update_queue_.front().first = iter->first;
131     update_queue_.front().second->MergeDictionary(iter->second.get());
132     ++iter;
133   }
134   // Remove all the intermediate items that have been merged into the first
135   // entry.
136   update_queue_.erase(start, iter);
137   command_update_in_progress_ = true;
138   cloud_command_updater_->UpdateCommand(
139       command_instance_->GetID(), *update_queue_.front().second,
140       base::Bind(&CloudCommandProxy::OnUpdateCommandDone,
141                  weak_ptr_factory_.GetWeakPtr()));
142 }
143 
ResendCommandUpdate()144 void CloudCommandProxy::ResendCommandUpdate() {
145   command_update_in_progress_ = false;
146   SendCommandUpdate();
147 }
148 
OnUpdateCommandDone(ErrorPtr error)149 void CloudCommandProxy::OnUpdateCommandDone(ErrorPtr error) {
150   command_update_in_progress_ = false;
151   cloud_backoff_entry_->InformOfRequest(!error);
152   if (!error) {
153     // Remove the succeeded update from the queue.
154     update_queue_.pop_front();
155   }
156   // If we have more pending updates, send a new request to the server
157   // immediately, if possible.
158   SendCommandUpdate();
159 }
160 
OnDeviceStateUpdated(ComponentManager::UpdateID update_id)161 void CloudCommandProxy::OnDeviceStateUpdated(
162     ComponentManager::UpdateID update_id) {
163   last_state_update_id_ = update_id;
164   // Try to send out any queued command updates that could be performed after
165   // a device state is updated.
166   SendCommandUpdate();
167 }
168 
169 }  // namespace weave
170