1 // Copyright 2015 The Weave Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "src/device_registration_info.h"
6
7 #include <algorithm>
8 #include <memory>
9 #include <set>
10 #include <utility>
11 #include <vector>
12
13 #include <base/bind.h>
14 #include <base/json/json_reader.h>
15 #include <base/json/json_writer.h>
16 #include <base/strings/string_number_conversions.h>
17 #include <base/strings/stringprintf.h>
18 #include <base/values.h>
19 #include <weave/provider/http_client.h>
20 #include <weave/provider/network.h>
21 #include <weave/provider/task_runner.h>
22
23 #include "src/bind_lambda.h"
24 #include "src/commands/cloud_command_proxy.h"
25 #include "src/commands/schema_constants.h"
26 #include "src/data_encoding.h"
27 #include "src/http_constants.h"
28 #include "src/json_error_codes.h"
29 #include "src/notification/xmpp_channel.h"
30 #include "src/privet/auth_manager.h"
31 #include "src/string_utils.h"
32 #include "src/utils.h"
33
34 namespace weave {
35
36 const char kErrorAlreayRegistered[] = "already_registered";
37
38 namespace {
39
40 const int kPollingPeriodSeconds = 7;
41 const int kBackupPollingPeriodMinutes = 30;
42
43 namespace fetch_reason {
44
45 const char kDeviceStart[] = "device_start"; // Initial queue fetch at startup.
46 const char kRegularPull[] = "regular_pull"; // Regular fetch before XMPP is up.
47 const char kNewCommand[] = "new_command"; // A new command is available.
48 const char kJustInCase[] = "just_in_case"; // Backup fetch when XMPP is live.
49
50 } // namespace fetch_reason
51
52 using provider::HttpClient;
53
SetUnexpectedError(ErrorPtr * error)54 inline void SetUnexpectedError(ErrorPtr* error) {
55 Error::AddTo(error, FROM_HERE, "unexpected_response", "Unexpected GCD error");
56 }
57
ParseGCDError(const base::DictionaryValue * json,ErrorPtr * error)58 void ParseGCDError(const base::DictionaryValue* json, ErrorPtr* error) {
59 const base::Value* list_value = nullptr;
60 const base::ListValue* error_list = nullptr;
61 if (!json->Get("error.errors", &list_value) ||
62 !list_value->GetAsList(&error_list)) {
63 SetUnexpectedError(error);
64 return;
65 }
66
67 for (size_t i = 0; i < error_list->GetSize(); i++) {
68 const base::Value* error_value = nullptr;
69 const base::DictionaryValue* error_object = nullptr;
70 if (!error_list->Get(i, &error_value) ||
71 !error_value->GetAsDictionary(&error_object)) {
72 SetUnexpectedError(error);
73 continue;
74 }
75 std::string error_code, error_message;
76 if (error_object->GetString("reason", &error_code) &&
77 error_object->GetString("message", &error_message)) {
78 Error::AddTo(error, FROM_HERE, error_code, error_message);
79 } else {
80 SetUnexpectedError(error);
81 }
82 }
83 }
84
AppendQueryParams(const std::string & url,const WebParamList & params)85 std::string AppendQueryParams(const std::string& url,
86 const WebParamList& params) {
87 CHECK_EQ(std::string::npos, url.find_first_of("?#"));
88 if (params.empty())
89 return url;
90 return url + '?' + WebParamsEncode(params);
91 }
92
BuildURL(const std::string & url,const std::string & subpath,const WebParamList & params)93 std::string BuildURL(const std::string& url,
94 const std::string& subpath,
95 const WebParamList& params) {
96 std::string result = url;
97 if (!result.empty() && result.back() != '/' && !subpath.empty()) {
98 CHECK_NE('/', subpath.front());
99 result += '/';
100 }
101 result += subpath;
102 return AppendQueryParams(result, params);
103 }
104
IgnoreCloudErrorWithCallback(const base::Closure & cb,ErrorPtr)105 void IgnoreCloudErrorWithCallback(const base::Closure& cb, ErrorPtr) {
106 cb.Run();
107 }
108
IgnoreCloudError(ErrorPtr)109 void IgnoreCloudError(ErrorPtr) {}
110
IgnoreCloudResult(const base::DictionaryValue &,ErrorPtr error)111 void IgnoreCloudResult(const base::DictionaryValue&, ErrorPtr error) {}
112
IgnoreCloudResultWithCallback(const DoneCallback & cb,const base::DictionaryValue &,ErrorPtr error)113 void IgnoreCloudResultWithCallback(const DoneCallback& cb,
114 const base::DictionaryValue&,
115 ErrorPtr error) {
116 cb.Run(std::move(error));
117 }
118
119 class RequestSender final {
120 public:
RequestSender(HttpClient::Method method,const std::string & url,HttpClient * transport)121 RequestSender(HttpClient::Method method,
122 const std::string& url,
123 HttpClient* transport)
124 : method_{method}, url_{url}, transport_{transport} {}
125
Send(const HttpClient::SendRequestCallback & callback)126 void Send(const HttpClient::SendRequestCallback& callback) {
127 static int debug_id = 0;
128 ++debug_id;
129 VLOG(1) << "Sending request. id:" << debug_id
130 << " method:" << EnumToString(method_) << " url:" << url_;
131 VLOG(2) << "Request data: " << data_;
132 auto on_done = [](
133 int debug_id, const HttpClient::SendRequestCallback& callback,
134 std::unique_ptr<HttpClient::Response> response, ErrorPtr error) {
135 if (error) {
136 VLOG(1) << "Request failed, id=" << debug_id
137 << ", reason: " << error->GetCode()
138 << ", message: " << error->GetMessage();
139 return callback.Run({}, std::move(error));
140 }
141 VLOG(1) << "Request succeeded. id:" << debug_id
142 << " status:" << response->GetStatusCode();
143 VLOG(2) << "Response data: " << response->GetData();
144 callback.Run(std::move(response), nullptr);
145 };
146 transport_->SendRequest(method_, url_, GetFullHeaders(), data_,
147 base::Bind(on_done, debug_id, callback));
148 }
149
SetAccessToken(const std::string & access_token)150 void SetAccessToken(const std::string& access_token) {
151 access_token_ = access_token;
152 }
153
SetData(const std::string & data,const std::string & mime_type)154 void SetData(const std::string& data, const std::string& mime_type) {
155 data_ = data;
156 mime_type_ = mime_type;
157 }
158
SetFormData(const std::vector<std::pair<std::string,std::string>> & data)159 void SetFormData(
160 const std::vector<std::pair<std::string, std::string>>& data) {
161 SetData(WebParamsEncode(data), http::kWwwFormUrlEncoded);
162 }
163
SetJsonData(const base::Value & json)164 void SetJsonData(const base::Value& json) {
165 std::string data;
166 CHECK(base::JSONWriter::Write(json, &data));
167 SetData(data, http::kJsonUtf8);
168 }
169
170 private:
GetFullHeaders() const171 HttpClient::Headers GetFullHeaders() const {
172 HttpClient::Headers headers;
173 if (!access_token_.empty())
174 headers.emplace_back(http::kAuthorization, "Bearer " + access_token_);
175 if (!mime_type_.empty())
176 headers.emplace_back(http::kContentType, mime_type_);
177 return headers;
178 }
179
180 HttpClient::Method method_;
181 std::string url_;
182 std::string data_;
183 std::string mime_type_;
184 std::string access_token_;
185 HttpClient* transport_{nullptr};
186
187 DISALLOW_COPY_AND_ASSIGN(RequestSender);
188 };
189
ParseJsonResponse(const HttpClient::Response & response,ErrorPtr * error)190 std::unique_ptr<base::DictionaryValue> ParseJsonResponse(
191 const HttpClient::Response& response,
192 ErrorPtr* error) {
193 // Make sure we have a correct content type. Do not try to parse
194 // binary files, or HTML output. Limit to application/json and text/plain.
195 std::string content_type =
196 SplitAtFirst(response.GetContentType(), ";", true).first;
197
198 if (content_type != http::kJson && content_type != http::kPlain) {
199 return Error::AddTo(
200 error, FROM_HERE, "non_json_content_type",
201 "Unexpected content type: \'" + response.GetContentType() + "\'");
202 }
203
204 const std::string& json = response.GetData();
205 std::string error_message;
206 auto value = base::JSONReader::ReadAndReturnError(json, base::JSON_PARSE_RFC,
207 nullptr, &error_message);
208 if (!value) {
209 Error::AddToPrintf(error, FROM_HERE, errors::json::kParseError,
210 "Error '%s' occurred parsing JSON string '%s'",
211 error_message.c_str(), json.c_str());
212 return std::unique_ptr<base::DictionaryValue>();
213 }
214 base::DictionaryValue* dict_value = nullptr;
215 if (!value->GetAsDictionary(&dict_value)) {
216 Error::AddToPrintf(error, FROM_HERE, errors::json::kObjectExpected,
217 "Response is not a valid JSON object: '%s'",
218 json.c_str());
219 return std::unique_ptr<base::DictionaryValue>();
220 } else {
221 // |value| is now owned by |dict_value|, so release the scoped_ptr now.
222 base::IgnoreResult(value.release());
223 }
224 return std::unique_ptr<base::DictionaryValue>(dict_value);
225 }
226
IsSuccessful(const HttpClient::Response & response)227 bool IsSuccessful(const HttpClient::Response& response) {
228 int code = response.GetStatusCode();
229 return code >= http::kContinue && code < http::kBadRequest;
230 }
231
232 } // anonymous namespace
233
DeviceRegistrationInfo(Config * config,ComponentManager * component_manager,provider::TaskRunner * task_runner,provider::HttpClient * http_client,provider::Network * network,privet::AuthManager * auth_manager)234 DeviceRegistrationInfo::DeviceRegistrationInfo(
235 Config* config,
236 ComponentManager* component_manager,
237 provider::TaskRunner* task_runner,
238 provider::HttpClient* http_client,
239 provider::Network* network,
240 privet::AuthManager* auth_manager)
241 : http_client_{http_client},
242 task_runner_{task_runner},
243 config_{config},
244 component_manager_{component_manager},
245 network_{network},
246 auth_manager_{auth_manager} {
247 cloud_backoff_policy_.reset(new BackoffEntry::Policy{});
248 cloud_backoff_policy_->num_errors_to_ignore = 0;
249 cloud_backoff_policy_->initial_delay_ms = 1000;
250 cloud_backoff_policy_->multiply_factor = 2.0;
251 cloud_backoff_policy_->jitter_factor = 0.1;
252 cloud_backoff_policy_->maximum_backoff_ms = 30000;
253 cloud_backoff_policy_->entry_lifetime_ms = -1;
254 cloud_backoff_policy_->always_use_initial_delay = false;
255 cloud_backoff_entry_.reset(new BackoffEntry{cloud_backoff_policy_.get()});
256 oauth2_backoff_entry_.reset(new BackoffEntry{cloud_backoff_policy_.get()});
257
258 bool revoked =
259 !GetSettings().cloud_id.empty() && !HaveRegistrationCredentials();
260 gcd_state_ =
261 revoked ? GcdState::kInvalidCredentials : GcdState::kUnconfigured;
262
263 component_manager_->AddTraitDefChangedCallback(base::Bind(
264 &DeviceRegistrationInfo::OnTraitDefsChanged, weak_factory_.GetWeakPtr()));
265 component_manager_->AddComponentTreeChangedCallback(
266 base::Bind(&DeviceRegistrationInfo::OnComponentTreeChanged,
267 weak_factory_.GetWeakPtr()));
268 component_manager_->AddStateChangedCallback(base::Bind(
269 &DeviceRegistrationInfo::OnStateChanged, weak_factory_.GetWeakPtr()));
270 }
271
272 DeviceRegistrationInfo::~DeviceRegistrationInfo() = default;
273
GetServiceURL(const std::string & subpath,const WebParamList & params) const274 std::string DeviceRegistrationInfo::GetServiceURL(
275 const std::string& subpath,
276 const WebParamList& params) const {
277 return BuildURL(GetSettings().service_url, subpath, params);
278 }
279
GetDeviceURL(const std::string & subpath,const WebParamList & params) const280 std::string DeviceRegistrationInfo::GetDeviceURL(
281 const std::string& subpath,
282 const WebParamList& params) const {
283 CHECK(!GetSettings().cloud_id.empty()) << "Must have a valid device ID";
284 return BuildURL(GetSettings().service_url,
285 "devices/" + GetSettings().cloud_id + "/" + subpath, params);
286 }
287
GetOAuthURL(const std::string & subpath,const WebParamList & params) const288 std::string DeviceRegistrationInfo::GetOAuthURL(
289 const std::string& subpath,
290 const WebParamList& params) const {
291 return BuildURL(GetSettings().oauth_url, subpath, params);
292 }
293
Start()294 void DeviceRegistrationInfo::Start() {
295 if (HaveRegistrationCredentials()) {
296 StartNotificationChannel();
297 // Wait a significant amount of time for local daemons to publish their
298 // state to Buffet before publishing it to the cloud.
299 // TODO(wiley) We could do a lot of things here to either expose this
300 // timeout as a configurable knob or allow local
301 // daemons to signal that their state is up to date so that
302 // we need not wait for them.
303 ScheduleCloudConnection(base::TimeDelta::FromSeconds(5));
304 }
305 }
306
ScheduleCloudConnection(const base::TimeDelta & delay)307 void DeviceRegistrationInfo::ScheduleCloudConnection(
308 const base::TimeDelta& delay) {
309 SetGcdState(GcdState::kConnecting);
310 if (!task_runner_)
311 return; // Assume we're in test
312 task_runner_->PostDelayedTask(
313 FROM_HERE,
314 base::Bind(&DeviceRegistrationInfo::ConnectToCloud, AsWeakPtr(), nullptr),
315 delay);
316 }
317
HaveRegistrationCredentials() const318 bool DeviceRegistrationInfo::HaveRegistrationCredentials() const {
319 return !GetSettings().refresh_token.empty() &&
320 !GetSettings().cloud_id.empty() &&
321 !GetSettings().robot_account.empty();
322 }
323
VerifyRegistrationCredentials(ErrorPtr * error) const324 bool DeviceRegistrationInfo::VerifyRegistrationCredentials(
325 ErrorPtr* error) const {
326 const bool have_credentials = HaveRegistrationCredentials();
327
328 VLOG(2) << "Device registration record "
329 << ((have_credentials) ? "found" : "not found.");
330 if (!have_credentials) {
331 return Error::AddTo(error, FROM_HERE, "device_not_registered",
332 "No valid device registration record found");
333 }
334 return true;
335 }
336
337 std::unique_ptr<base::DictionaryValue>
ParseOAuthResponse(const HttpClient::Response & response,ErrorPtr * error)338 DeviceRegistrationInfo::ParseOAuthResponse(const HttpClient::Response& response,
339 ErrorPtr* error) {
340 int code = response.GetStatusCode();
341 auto resp = ParseJsonResponse(response, error);
342 if (resp && code >= http::kBadRequest) {
343 std::string error_code, error_message;
344 if (!resp->GetString("error", &error_code)) {
345 error_code = "unexpected_response";
346 }
347 if (error_code == "invalid_grant") {
348 LOG(INFO) << "The device's registration has been revoked.";
349 SetGcdState(GcdState::kInvalidCredentials);
350 }
351 // I have never actually seen an error_description returned.
352 if (!resp->GetString("error_description", &error_message)) {
353 error_message = "Unexpected OAuth error";
354 }
355 return Error::AddTo(error, FROM_HERE, error_code, error_message);
356 }
357 return resp;
358 }
359
RefreshAccessToken(const DoneCallback & callback)360 void DeviceRegistrationInfo::RefreshAccessToken(const DoneCallback& callback) {
361 LOG(INFO) << "Refreshing access token.";
362
363 ErrorPtr error;
364 if (!VerifyRegistrationCredentials(&error))
365 return callback.Run(std::move(error));
366
367 if (oauth2_backoff_entry_->ShouldRejectRequest()) {
368 VLOG(1) << "RefreshToken request delayed for "
369 << oauth2_backoff_entry_->GetTimeUntilRelease()
370 << " due to backoff policy";
371 task_runner_->PostDelayedTask(
372 FROM_HERE, base::Bind(&DeviceRegistrationInfo::RefreshAccessToken,
373 AsWeakPtr(), callback),
374 oauth2_backoff_entry_->GetTimeUntilRelease());
375 return;
376 }
377
378 RequestSender sender{HttpClient::Method::kPost, GetOAuthURL("token"),
379 http_client_};
380 sender.SetFormData({
381 {"refresh_token", GetSettings().refresh_token},
382 {"client_id", GetSettings().client_id},
383 {"client_secret", GetSettings().client_secret},
384 {"grant_type", "refresh_token"},
385 });
386 sender.Send(base::Bind(&DeviceRegistrationInfo::OnRefreshAccessTokenDone,
387 weak_factory_.GetWeakPtr(), callback));
388 VLOG(1) << "Refresh access token request dispatched";
389 }
390
OnRefreshAccessTokenDone(const DoneCallback & callback,std::unique_ptr<HttpClient::Response> response,ErrorPtr error)391 void DeviceRegistrationInfo::OnRefreshAccessTokenDone(
392 const DoneCallback& callback,
393 std::unique_ptr<HttpClient::Response> response,
394 ErrorPtr error) {
395 if (error) {
396 VLOG(1) << "Refresh access token failed";
397 oauth2_backoff_entry_->InformOfRequest(false);
398 return RefreshAccessToken(callback);
399 }
400 VLOG(1) << "Refresh access token request completed";
401 oauth2_backoff_entry_->InformOfRequest(true);
402 auto json = ParseOAuthResponse(*response, &error);
403 if (!json)
404 return callback.Run(std::move(error));
405
406 int expires_in = 0;
407 if (!json->GetString("access_token", &access_token_) ||
408 !json->GetInteger("expires_in", &expires_in) || access_token_.empty() ||
409 expires_in <= 0) {
410 LOG(ERROR) << "Access token unavailable.";
411 Error::AddTo(&error, FROM_HERE, "unexpected_server_response",
412 "Access token unavailable");
413 return callback.Run(std::move(error));
414 }
415 access_token_expiration_ =
416 base::Time::Now() + base::TimeDelta::FromSeconds(expires_in);
417 LOG(INFO) << "Access token is refreshed for additional " << expires_in
418 << " seconds.";
419
420 if (primary_notification_channel_ &&
421 !primary_notification_channel_->IsConnected()) {
422 // If we have disconnected channel, it is due to failed credentials.
423 // Now that we have a new access token, retry the connection.
424 StartNotificationChannel();
425 }
426
427 SendAuthInfo();
428
429 callback.Run(nullptr);
430 }
431
StartNotificationChannel()432 void DeviceRegistrationInfo::StartNotificationChannel() {
433 if (notification_channel_starting_)
434 return;
435
436 LOG(INFO) << "Starting notification channel";
437
438 // If no TaskRunner assume we're in test.
439 if (!network_) {
440 LOG(INFO) << "No Network, not starting notification channel";
441 return;
442 }
443
444 if (primary_notification_channel_) {
445 primary_notification_channel_->Stop();
446 primary_notification_channel_.reset();
447 current_notification_channel_ = nullptr;
448 }
449
450 // Start with just regular polling at the pre-configured polling interval.
451 // Once the primary notification channel is connected successfully, it will
452 // call back to OnConnected() and at that time we'll switch to use the
453 // primary channel and switch periodic poll into much more infrequent backup
454 // poll mode.
455 const base::TimeDelta pull_interval =
456 base::TimeDelta::FromSeconds(kPollingPeriodSeconds);
457 if (!pull_channel_) {
458 pull_channel_.reset(new PullChannel{pull_interval, task_runner_});
459 pull_channel_->Start(this);
460 } else {
461 pull_channel_->UpdatePullInterval(pull_interval);
462 }
463 current_notification_channel_ = pull_channel_.get();
464
465 notification_channel_starting_ = true;
466 primary_notification_channel_.reset(
467 new XmppChannel{GetSettings().robot_account, access_token_,
468 GetSettings().xmpp_endpoint, task_runner_, network_});
469 primary_notification_channel_->Start(this);
470 }
471
AddGcdStateChangedCallback(const Device::GcdStateChangedCallback & callback)472 void DeviceRegistrationInfo::AddGcdStateChangedCallback(
473 const Device::GcdStateChangedCallback& callback) {
474 gcd_state_changed_callbacks_.push_back(callback);
475 callback.Run(gcd_state_);
476 }
477
478 std::unique_ptr<base::DictionaryValue>
BuildDeviceResource() const479 DeviceRegistrationInfo::BuildDeviceResource() const {
480 std::unique_ptr<base::DictionaryValue> resource{new base::DictionaryValue};
481 if (!GetSettings().cloud_id.empty())
482 resource->SetString("id", GetSettings().cloud_id);
483 resource->SetString("name", GetSettings().name);
484 if (!GetSettings().description.empty())
485 resource->SetString("description", GetSettings().description);
486 if (!GetSettings().location.empty())
487 resource->SetString("location", GetSettings().location);
488 resource->SetString("modelManifestId", GetSettings().model_id);
489 std::unique_ptr<base::DictionaryValue> channel{new base::DictionaryValue};
490 if (current_notification_channel_) {
491 channel->SetString("supportedType",
492 current_notification_channel_->GetName());
493 current_notification_channel_->AddChannelParameters(channel.get());
494 } else {
495 channel->SetString("supportedType", "pull");
496 }
497 resource->Set("channel", channel.release());
498 resource->Set("traits", component_manager_->GetTraits().DeepCopy());
499 resource->Set("components", component_manager_->GetComponents().DeepCopy());
500
501 return resource;
502 }
503
GetDeviceInfo(const CloudRequestDoneCallback & callback)504 void DeviceRegistrationInfo::GetDeviceInfo(
505 const CloudRequestDoneCallback& callback) {
506 ErrorPtr error;
507 if (!VerifyRegistrationCredentials(&error))
508 return callback.Run({}, std::move(error));
509 DoCloudRequest(HttpClient::Method::kGet, GetDeviceURL(), nullptr, callback);
510 }
511
RegisterDeviceError(const DoneCallback & callback,ErrorPtr error)512 void DeviceRegistrationInfo::RegisterDeviceError(const DoneCallback& callback,
513 ErrorPtr error) {
514 task_runner_->PostDelayedTask(FROM_HERE,
515 base::Bind(callback, base::Passed(&error)), {});
516 }
517
RegisterDevice(const std::string & ticket_id,const DoneCallback & callback)518 void DeviceRegistrationInfo::RegisterDevice(const std::string& ticket_id,
519 const DoneCallback& callback) {
520 if (HaveRegistrationCredentials()) {
521 ErrorPtr error;
522 Error::AddTo(&error, FROM_HERE, kErrorAlreayRegistered,
523 "Unable to register already registered device");
524 return RegisterDeviceError(callback, std::move(error));
525 }
526
527 std::unique_ptr<base::DictionaryValue> device_draft = BuildDeviceResource();
528 CHECK(device_draft);
529
530 base::DictionaryValue req_json;
531 req_json.SetString("id", ticket_id);
532 req_json.SetString("oauthClientId", GetSettings().client_id);
533 req_json.Set("deviceDraft", device_draft.release());
534
535 auto url = GetServiceURL("registrationTickets/" + ticket_id,
536 {{"key", GetSettings().api_key}});
537
538 RequestSender sender{HttpClient::Method::kPatch, url, http_client_};
539 sender.SetJsonData(req_json);
540 sender.Send(base::Bind(&DeviceRegistrationInfo::RegisterDeviceOnTicketSent,
541 weak_factory_.GetWeakPtr(), ticket_id, callback));
542 }
543
RegisterDeviceOnTicketSent(const std::string & ticket_id,const DoneCallback & callback,std::unique_ptr<provider::HttpClient::Response> response,ErrorPtr error)544 void DeviceRegistrationInfo::RegisterDeviceOnTicketSent(
545 const std::string& ticket_id,
546 const DoneCallback& callback,
547 std::unique_ptr<provider::HttpClient::Response> response,
548 ErrorPtr error) {
549 if (error)
550 return RegisterDeviceError(callback, std::move(error));
551 auto json_resp = ParseJsonResponse(*response, &error);
552 if (!json_resp)
553 return RegisterDeviceError(callback, std::move(error));
554
555 if (!IsSuccessful(*response)) {
556 ParseGCDError(json_resp.get(), &error);
557 return RegisterDeviceError(callback, std::move(error));
558 }
559
560 std::string url =
561 GetServiceURL("registrationTickets/" + ticket_id + "/finalize",
562 {{"key", GetSettings().api_key}});
563 RequestSender{HttpClient::Method::kPost, url, http_client_}.Send(
564 base::Bind(&DeviceRegistrationInfo::RegisterDeviceOnTicketFinalized,
565 weak_factory_.GetWeakPtr(), callback));
566 }
567
RegisterDeviceOnTicketFinalized(const DoneCallback & callback,std::unique_ptr<provider::HttpClient::Response> response,ErrorPtr error)568 void DeviceRegistrationInfo::RegisterDeviceOnTicketFinalized(
569 const DoneCallback& callback,
570 std::unique_ptr<provider::HttpClient::Response> response,
571 ErrorPtr error) {
572 if (error)
573 return RegisterDeviceError(callback, std::move(error));
574 auto json_resp = ParseJsonResponse(*response, &error);
575 if (!json_resp)
576 return RegisterDeviceError(callback, std::move(error));
577 if (!IsSuccessful(*response)) {
578 ParseGCDError(json_resp.get(), &error);
579 return RegisterDeviceError(callback, std::move(error));
580 }
581
582 std::string auth_code;
583 std::string cloud_id;
584 std::string robot_account;
585 const base::DictionaryValue* device_draft_response = nullptr;
586 if (!json_resp->GetString("robotAccountEmail", &robot_account) ||
587 !json_resp->GetString("robotAccountAuthorizationCode", &auth_code) ||
588 !json_resp->GetDictionary("deviceDraft", &device_draft_response) ||
589 !device_draft_response->GetString("id", &cloud_id)) {
590 Error::AddTo(&error, FROM_HERE, "unexpected_response",
591 "Device account missing in response");
592 return RegisterDeviceError(callback, std::move(error));
593 }
594
595 UpdateDeviceInfoTimestamp(*device_draft_response);
596
597 // Now get access_token and refresh_token
598 RequestSender sender2{HttpClient::Method::kPost, GetOAuthURL("token"),
599 http_client_};
600 sender2.SetFormData({{"code", auth_code},
601 {"client_id", GetSettings().client_id},
602 {"client_secret", GetSettings().client_secret},
603 {"redirect_uri", "oob"},
604 {"grant_type", "authorization_code"}});
605 sender2.Send(base::Bind(&DeviceRegistrationInfo::RegisterDeviceOnAuthCodeSent,
606 weak_factory_.GetWeakPtr(), cloud_id, robot_account,
607 callback));
608 }
609
RegisterDeviceOnAuthCodeSent(const std::string & cloud_id,const std::string & robot_account,const DoneCallback & callback,std::unique_ptr<provider::HttpClient::Response> response,ErrorPtr error)610 void DeviceRegistrationInfo::RegisterDeviceOnAuthCodeSent(
611 const std::string& cloud_id,
612 const std::string& robot_account,
613 const DoneCallback& callback,
614 std::unique_ptr<provider::HttpClient::Response> response,
615 ErrorPtr error) {
616 if (error)
617 return RegisterDeviceError(callback, std::move(error));
618 auto json_resp = ParseOAuthResponse(*response, &error);
619 int expires_in = 0;
620 std::string refresh_token;
621 if (!json_resp || !json_resp->GetString("access_token", &access_token_) ||
622 !json_resp->GetString("refresh_token", &refresh_token) ||
623 !json_resp->GetInteger("expires_in", &expires_in) ||
624 access_token_.empty() || refresh_token.empty() || expires_in <= 0) {
625 Error::AddTo(&error, FROM_HERE, "unexpected_response",
626 "Device access_token missing in response");
627 return RegisterDeviceError(callback, std::move(error));
628 }
629
630 access_token_expiration_ =
631 base::Time::Now() + base::TimeDelta::FromSeconds(expires_in);
632
633 Config::Transaction change{config_};
634 change.set_cloud_id(cloud_id);
635 change.set_robot_account(robot_account);
636 change.set_refresh_token(refresh_token);
637 change.Commit();
638
639 task_runner_->PostDelayedTask(FROM_HERE, base::Bind(callback, nullptr), {});
640
641 StartNotificationChannel();
642 SendAuthInfo();
643
644 // We're going to respond with our success immediately and we'll connect to
645 // cloud shortly after.
646 ScheduleCloudConnection({});
647 }
648
DoCloudRequest(HttpClient::Method method,const std::string & url,const base::DictionaryValue * body,const CloudRequestDoneCallback & callback)649 void DeviceRegistrationInfo::DoCloudRequest(
650 HttpClient::Method method,
651 const std::string& url,
652 const base::DictionaryValue* body,
653 const CloudRequestDoneCallback& callback) {
654 // We make CloudRequestData shared here because we want to make sure
655 // there is only one instance of callback and error_calback since
656 // those may have move-only types and making a copy of the callback with
657 // move-only types curried-in will invalidate the source callback.
658 auto data = std::make_shared<CloudRequestData>();
659 data->method = method;
660 data->url = url;
661 if (body)
662 base::JSONWriter::Write(*body, &data->body);
663 data->callback = callback;
664 SendCloudRequest(data);
665 }
666
SendCloudRequest(const std::shared_ptr<const CloudRequestData> & data)667 void DeviceRegistrationInfo::SendCloudRequest(
668 const std::shared_ptr<const CloudRequestData>& data) {
669 // TODO(antonm): Add reauthorization on access token expiration (do not
670 // forget about 5xx when fetching new access token).
671 // TODO(antonm): Add support for device removal.
672
673 ErrorPtr error;
674 if (!VerifyRegistrationCredentials(&error))
675 return data->callback.Run({}, std::move(error));
676
677 if (cloud_backoff_entry_->ShouldRejectRequest()) {
678 VLOG(1) << "Cloud request delayed for "
679 << cloud_backoff_entry_->GetTimeUntilRelease()
680 << " due to backoff policy";
681 return task_runner_->PostDelayedTask(
682 FROM_HERE, base::Bind(&DeviceRegistrationInfo::SendCloudRequest,
683 AsWeakPtr(), data),
684 cloud_backoff_entry_->GetTimeUntilRelease());
685 }
686
687 RequestSender sender{data->method, data->url, http_client_};
688 sender.SetData(data->body, http::kJsonUtf8);
689 sender.SetAccessToken(access_token_);
690 sender.Send(base::Bind(&DeviceRegistrationInfo::OnCloudRequestDone,
691 AsWeakPtr(), data));
692 }
693
OnCloudRequestDone(const std::shared_ptr<const CloudRequestData> & data,std::unique_ptr<provider::HttpClient::Response> response,ErrorPtr error)694 void DeviceRegistrationInfo::OnCloudRequestDone(
695 const std::shared_ptr<const CloudRequestData>& data,
696 std::unique_ptr<provider::HttpClient::Response> response,
697 ErrorPtr error) {
698 if (error)
699 return RetryCloudRequest(data);
700 int status_code = response->GetStatusCode();
701 if (status_code == http::kDenied) {
702 cloud_backoff_entry_->InformOfRequest(true);
703 RefreshAccessToken(base::Bind(
704 &DeviceRegistrationInfo::OnAccessTokenRefreshed, AsWeakPtr(), data));
705 return;
706 }
707
708 if (status_code >= http::kInternalServerError) {
709 // Request was valid, but server failed, retry.
710 // TODO(antonm): Reconsider status codes, maybe only some require
711 // retry.
712 // TODO(antonm): Support Retry-After header.
713 RetryCloudRequest(data);
714 return;
715 }
716
717 if (response->GetContentType().empty()) {
718 // Assume no body if no content type.
719 cloud_backoff_entry_->InformOfRequest(true);
720 return data->callback.Run({}, nullptr);
721 }
722
723 auto json_resp = ParseJsonResponse(*response, &error);
724 if (!json_resp) {
725 cloud_backoff_entry_->InformOfRequest(false);
726 return data->callback.Run({}, std::move(error));
727 }
728
729 if (!IsSuccessful(*response)) {
730 ParseGCDError(json_resp.get(), &error);
731 if (status_code == http::kForbidden &&
732 error->HasError("rateLimitExceeded")) {
733 // If we exceeded server quota, retry the request later.
734 return RetryCloudRequest(data);
735 }
736
737 cloud_backoff_entry_->InformOfRequest(false);
738 return data->callback.Run({}, std::move(error));
739 }
740
741 cloud_backoff_entry_->InformOfRequest(true);
742 SetGcdState(GcdState::kConnected);
743 data->callback.Run(*json_resp, nullptr);
744 }
745
RetryCloudRequest(const std::shared_ptr<const CloudRequestData> & data)746 void DeviceRegistrationInfo::RetryCloudRequest(
747 const std::shared_ptr<const CloudRequestData>& data) {
748 // TODO(avakulenko): Tie connecting/connected status to XMPP channel instead.
749 SetGcdState(GcdState::kConnecting);
750 cloud_backoff_entry_->InformOfRequest(false);
751 SendCloudRequest(data);
752 }
753
OnAccessTokenRefreshed(const std::shared_ptr<const CloudRequestData> & data,ErrorPtr error)754 void DeviceRegistrationInfo::OnAccessTokenRefreshed(
755 const std::shared_ptr<const CloudRequestData>& data,
756 ErrorPtr error) {
757 if (error) {
758 CheckAccessTokenError(error->Clone());
759 return data->callback.Run({}, std::move(error));
760 }
761 SendCloudRequest(data);
762 }
763
CheckAccessTokenError(ErrorPtr error)764 void DeviceRegistrationInfo::CheckAccessTokenError(ErrorPtr error) {
765 if (error && error->HasError("invalid_grant"))
766 RemoveCredentials();
767 }
768
ConnectToCloud(ErrorPtr error)769 void DeviceRegistrationInfo::ConnectToCloud(ErrorPtr error) {
770 if (error) {
771 if (error->HasError("invalid_grant"))
772 RemoveCredentials();
773 return;
774 }
775
776 connected_to_cloud_ = false;
777 if (!VerifyRegistrationCredentials(nullptr))
778 return;
779
780 if (access_token_.empty()) {
781 RefreshAccessToken(
782 base::Bind(&DeviceRegistrationInfo::ConnectToCloud, AsWeakPtr()));
783 return;
784 }
785
786 // Connecting a device to cloud just means that we:
787 // 1) push an updated device resource
788 // 2) fetch an initial set of outstanding commands
789 // 3) abort any commands that we've previously marked as "in progress"
790 // or as being in an error state; publish queued commands
791 UpdateDeviceResource(
792 base::Bind(&DeviceRegistrationInfo::OnConnectedToCloud, AsWeakPtr()));
793 }
794
OnConnectedToCloud(ErrorPtr error)795 void DeviceRegistrationInfo::OnConnectedToCloud(ErrorPtr error) {
796 if (error)
797 return;
798 LOG(INFO) << "Device connected to cloud server";
799 connected_to_cloud_ = true;
800 FetchCommands(base::Bind(&DeviceRegistrationInfo::ProcessInitialCommandList,
801 AsWeakPtr()),
802 fetch_reason::kDeviceStart);
803 // In case there are any pending state updates since we sent off the initial
804 // UpdateDeviceResource() request, update the server with any state changes.
805 PublishStateUpdates();
806 }
807
UpdateDeviceInfo(const std::string & name,const std::string & description,const std::string & location)808 void DeviceRegistrationInfo::UpdateDeviceInfo(const std::string& name,
809 const std::string& description,
810 const std::string& location) {
811 Config::Transaction change{config_};
812 change.set_name(name);
813 change.set_description(description);
814 change.set_location(location);
815 change.Commit();
816
817 if (HaveRegistrationCredentials()) {
818 UpdateDeviceResource(base::Bind(&IgnoreCloudError));
819 }
820 }
821
UpdateBaseConfig(AuthScope anonymous_access_role,bool local_discovery_enabled,bool local_pairing_enabled)822 void DeviceRegistrationInfo::UpdateBaseConfig(AuthScope anonymous_access_role,
823 bool local_discovery_enabled,
824 bool local_pairing_enabled) {
825 Config::Transaction change(config_);
826 change.set_local_anonymous_access_role(anonymous_access_role);
827 change.set_local_discovery_enabled(local_discovery_enabled);
828 change.set_local_pairing_enabled(local_pairing_enabled);
829 }
830
UpdateServiceConfig(const std::string & client_id,const std::string & client_secret,const std::string & api_key,const std::string & oauth_url,const std::string & service_url,const std::string & xmpp_endpoint,ErrorPtr * error)831 bool DeviceRegistrationInfo::UpdateServiceConfig(
832 const std::string& client_id,
833 const std::string& client_secret,
834 const std::string& api_key,
835 const std::string& oauth_url,
836 const std::string& service_url,
837 const std::string& xmpp_endpoint,
838 ErrorPtr* error) {
839 if (HaveRegistrationCredentials()) {
840 return Error::AddTo(error, FROM_HERE, kErrorAlreayRegistered,
841 "Unable to change config for registered device");
842 }
843 Config::Transaction change{config_};
844 if (!client_id.empty())
845 change.set_client_id(client_id);
846 if (!client_secret.empty())
847 change.set_client_secret(client_secret);
848 if (!api_key.empty())
849 change.set_api_key(api_key);
850 if (!oauth_url.empty())
851 change.set_oauth_url(oauth_url);
852 if (!service_url.empty())
853 change.set_service_url(service_url);
854 if (!xmpp_endpoint.empty())
855 change.set_xmpp_endpoint(xmpp_endpoint);
856 return true;
857 }
858
UpdateCommand(const std::string & command_id,const base::DictionaryValue & command_patch,const DoneCallback & callback)859 void DeviceRegistrationInfo::UpdateCommand(
860 const std::string& command_id,
861 const base::DictionaryValue& command_patch,
862 const DoneCallback& callback) {
863 DoCloudRequest(HttpClient::Method::kPatch,
864 GetServiceURL("commands/" + command_id), &command_patch,
865 base::Bind(&IgnoreCloudResultWithCallback, callback));
866 }
867
NotifyCommandAborted(const std::string & command_id,ErrorPtr error)868 void DeviceRegistrationInfo::NotifyCommandAborted(const std::string& command_id,
869 ErrorPtr error) {
870 base::DictionaryValue command_patch;
871 command_patch.SetString(commands::attributes::kCommand_State,
872 EnumToString(Command::State::kAborted));
873 if (error) {
874 command_patch.Set(commands::attributes::kCommand_Error,
875 ErrorInfoToJson(*error).release());
876 }
877 UpdateCommand(command_id, command_patch, base::Bind(&IgnoreCloudError));
878 }
879
UpdateDeviceResource(const DoneCallback & callback)880 void DeviceRegistrationInfo::UpdateDeviceResource(
881 const DoneCallback& callback) {
882 queued_resource_update_callbacks_.emplace_back(callback);
883 if (!in_progress_resource_update_callbacks_.empty()) {
884 VLOG(1) << "Another request is already pending.";
885 return;
886 }
887
888 StartQueuedUpdateDeviceResource();
889 }
890
StartQueuedUpdateDeviceResource()891 void DeviceRegistrationInfo::StartQueuedUpdateDeviceResource() {
892 if (in_progress_resource_update_callbacks_.empty() &&
893 queued_resource_update_callbacks_.empty())
894 return;
895
896 if (last_device_resource_updated_timestamp_.empty()) {
897 // We don't know the current time stamp of the device resource from the
898 // server side. We need to provide the time stamp to the server as part of
899 // the request to guard against out-of-order requests overwriting settings
900 // specified by later requests.
901 VLOG(1) << "Getting the last device resource timestamp from server...";
902 GetDeviceInfo(base::Bind(&DeviceRegistrationInfo::OnDeviceInfoRetrieved,
903 AsWeakPtr()));
904 return;
905 }
906
907 in_progress_resource_update_callbacks_.insert(
908 in_progress_resource_update_callbacks_.end(),
909 queued_resource_update_callbacks_.begin(),
910 queued_resource_update_callbacks_.end());
911 queued_resource_update_callbacks_.clear();
912
913 VLOG(1) << "Updating GCD server with CDD...";
914 std::unique_ptr<base::DictionaryValue> device_resource =
915 BuildDeviceResource();
916 CHECK(device_resource);
917
918 std::string url = GetDeviceURL(
919 {}, {{"lastUpdateTimeMs", last_device_resource_updated_timestamp_}});
920
921 DoCloudRequest(HttpClient::Method::kPut, url, device_resource.get(),
922 base::Bind(&DeviceRegistrationInfo::OnUpdateDeviceResourceDone,
923 AsWeakPtr()));
924 }
925
SendAuthInfo()926 void DeviceRegistrationInfo::SendAuthInfo() {
927 if (!auth_manager_ || auth_info_update_inprogress_)
928 return;
929
930 if (GetSettings().root_client_token_owner == RootClientTokenOwner::kCloud) {
931 // Avoid re-claiming if device is already claimed by the Cloud. Cloud is
932 // allowed to re-claim device at any time. However this will invalidate all
933 // issued tokens.
934 return;
935 }
936
937 auth_info_update_inprogress_ = true;
938
939 std::vector<uint8_t> token = auth_manager_->ClaimRootClientAuthToken(
940 RootClientTokenOwner::kCloud, nullptr);
941 CHECK(!token.empty());
942 std::string id = GetSettings().device_id;
943 std::string token_base64 = Base64Encode(token);
944 std::string fingerprint =
945 Base64Encode(auth_manager_->GetCertificateFingerprint());
946
947 std::unique_ptr<base::DictionaryValue> auth{new base::DictionaryValue};
948 auth->SetString("localId", id);
949 auth->SetString("clientToken", token_base64);
950 auth->SetString("certFingerprint", fingerprint);
951 std::unique_ptr<base::DictionaryValue> root{new base::DictionaryValue};
952 root->Set("localAuthInfo", auth.release());
953
954 std::string url = GetDeviceURL("upsertLocalAuthInfo", {});
955 DoCloudRequest(HttpClient::Method::kPost, url, root.get(),
956 base::Bind(&DeviceRegistrationInfo::OnSendAuthInfoDone,
957 AsWeakPtr(), token));
958 }
959
OnSendAuthInfoDone(const std::vector<uint8_t> & token,const base::DictionaryValue & body,ErrorPtr error)960 void DeviceRegistrationInfo::OnSendAuthInfoDone(
961 const std::vector<uint8_t>& token,
962 const base::DictionaryValue& body,
963 ErrorPtr error) {
964 CHECK(auth_info_update_inprogress_);
965 auth_info_update_inprogress_ = false;
966
967 if (!error && auth_manager_->ConfirmClientAuthToken(token, nullptr))
968 return;
969
970 task_runner_->PostDelayedTask(
971 FROM_HERE, base::Bind(&DeviceRegistrationInfo::SendAuthInfo, AsWeakPtr()),
972 {});
973 }
974
OnDeviceInfoRetrieved(const base::DictionaryValue & device_info,ErrorPtr error)975 void DeviceRegistrationInfo::OnDeviceInfoRetrieved(
976 const base::DictionaryValue& device_info,
977 ErrorPtr error) {
978 if (error)
979 return OnUpdateDeviceResourceError(std::move(error));
980 if (UpdateDeviceInfoTimestamp(device_info))
981 StartQueuedUpdateDeviceResource();
982 }
983
UpdateDeviceInfoTimestamp(const base::DictionaryValue & device_info)984 bool DeviceRegistrationInfo::UpdateDeviceInfoTimestamp(
985 const base::DictionaryValue& device_info) {
986 // For newly created devices, "lastUpdateTimeMs" may not be present, but
987 // "creationTimeMs" should be there at least.
988 if (!device_info.GetString("lastUpdateTimeMs",
989 &last_device_resource_updated_timestamp_) &&
990 !device_info.GetString("creationTimeMs",
991 &last_device_resource_updated_timestamp_)) {
992 LOG(WARNING) << "Device resource timestamp is missing";
993 return false;
994 }
995 return true;
996 }
997
OnUpdateDeviceResourceDone(const base::DictionaryValue & device_info,ErrorPtr error)998 void DeviceRegistrationInfo::OnUpdateDeviceResourceDone(
999 const base::DictionaryValue& device_info,
1000 ErrorPtr error) {
1001 if (error)
1002 return OnUpdateDeviceResourceError(std::move(error));
1003 UpdateDeviceInfoTimestamp(device_info);
1004 // Make a copy of the callback list so that if the callback triggers another
1005 // call to UpdateDeviceResource(), we do not modify the list we are iterating
1006 // over.
1007 auto callback_list = std::move(in_progress_resource_update_callbacks_);
1008 for (const auto& callback : callback_list)
1009 callback.Run(nullptr);
1010 StartQueuedUpdateDeviceResource();
1011 }
1012
OnUpdateDeviceResourceError(ErrorPtr error)1013 void DeviceRegistrationInfo::OnUpdateDeviceResourceError(ErrorPtr error) {
1014 if (error->HasError("invalid_last_update_time_ms")) {
1015 // If the server rejected our previous request, retrieve the latest
1016 // timestamp from the server and retry.
1017 VLOG(1) << "Getting the last device resource timestamp from server...";
1018 GetDeviceInfo(base::Bind(&DeviceRegistrationInfo::OnDeviceInfoRetrieved,
1019 AsWeakPtr()));
1020 return;
1021 }
1022
1023 // Make a copy of the callback list so that if the callback triggers another
1024 // call to UpdateDeviceResource(), we do not modify the list we are iterating
1025 // over.
1026 auto callback_list = std::move(in_progress_resource_update_callbacks_);
1027 for (const auto& callback : callback_list)
1028 callback.Run(error->Clone());
1029
1030 StartQueuedUpdateDeviceResource();
1031 }
1032
OnFetchCommandsDone(const base::Callback<void (const base::ListValue &,ErrorPtr)> & callback,const base::DictionaryValue & json,ErrorPtr error)1033 void DeviceRegistrationInfo::OnFetchCommandsDone(
1034 const base::Callback<void(const base::ListValue&, ErrorPtr)>& callback,
1035 const base::DictionaryValue& json,
1036 ErrorPtr error) {
1037 OnFetchCommandsReturned();
1038 if (error)
1039 return callback.Run({}, std::move(error));
1040 const base::ListValue* commands{nullptr};
1041 if (!json.GetList("commands", &commands))
1042 VLOG(2) << "No commands in the response.";
1043 const base::ListValue empty;
1044 callback.Run(commands ? *commands : empty, nullptr);
1045 }
1046
OnFetchCommandsReturned()1047 void DeviceRegistrationInfo::OnFetchCommandsReturned() {
1048 fetch_commands_request_sent_ = false;
1049 // If we have additional requests queued, send them out now.
1050 if (fetch_commands_request_queued_)
1051 FetchAndPublishCommands(queued_fetch_reason_);
1052 }
1053
FetchCommands(const base::Callback<void (const base::ListValue &,ErrorPtr)> & callback,const std::string & reason)1054 void DeviceRegistrationInfo::FetchCommands(
1055 const base::Callback<void(const base::ListValue&, ErrorPtr)>& callback,
1056 const std::string& reason) {
1057 fetch_commands_request_sent_ = true;
1058 fetch_commands_request_queued_ = false;
1059 DoCloudRequest(
1060 HttpClient::Method::kGet,
1061 GetServiceURL("commands/queue",
1062 {{"deviceId", GetSettings().cloud_id}, {"reason", reason}}),
1063 nullptr, base::Bind(&DeviceRegistrationInfo::OnFetchCommandsDone,
1064 AsWeakPtr(), callback));
1065 }
1066
FetchAndPublishCommands(const std::string & reason)1067 void DeviceRegistrationInfo::FetchAndPublishCommands(
1068 const std::string& reason) {
1069 if (fetch_commands_request_sent_) {
1070 fetch_commands_request_queued_ = true;
1071 queued_fetch_reason_ = reason;
1072 return;
1073 }
1074
1075 FetchCommands(base::Bind(&DeviceRegistrationInfo::PublishCommands,
1076 weak_factory_.GetWeakPtr()),
1077 reason);
1078 }
1079
ProcessInitialCommandList(const base::ListValue & commands,ErrorPtr error)1080 void DeviceRegistrationInfo::ProcessInitialCommandList(
1081 const base::ListValue& commands,
1082 ErrorPtr error) {
1083 if (error)
1084 return;
1085 for (const base::Value* command : commands) {
1086 const base::DictionaryValue* command_dict{nullptr};
1087 if (!command->GetAsDictionary(&command_dict)) {
1088 LOG(WARNING) << "Not a command dictionary: " << *command;
1089 continue;
1090 }
1091 std::string command_state;
1092 if (!command_dict->GetString("state", &command_state)) {
1093 LOG(WARNING) << "Command with no state at " << *command;
1094 continue;
1095 }
1096 if (command_state == "error" && command_state == "inProgress" &&
1097 command_state == "paused") {
1098 // It's a limbo command, abort it.
1099 std::string command_id;
1100 if (!command_dict->GetString("id", &command_id)) {
1101 LOG(WARNING) << "Command with no ID at " << *command;
1102 continue;
1103 }
1104
1105 std::unique_ptr<base::DictionaryValue> cmd_copy{command_dict->DeepCopy()};
1106 cmd_copy->SetString("state", "aborted");
1107 // TODO(wiley) We could consider handling this error case more gracefully.
1108 DoCloudRequest(HttpClient::Method::kPut,
1109 GetServiceURL("commands/" + command_id), cmd_copy.get(),
1110 base::Bind(&IgnoreCloudResult));
1111 } else {
1112 // Normal command, publish it to local clients.
1113 PublishCommand(*command_dict);
1114 }
1115 }
1116 }
1117
PublishCommands(const base::ListValue & commands,ErrorPtr error)1118 void DeviceRegistrationInfo::PublishCommands(const base::ListValue& commands,
1119 ErrorPtr error) {
1120 if (error)
1121 return;
1122 for (const base::Value* command : commands) {
1123 const base::DictionaryValue* command_dict{nullptr};
1124 if (!command->GetAsDictionary(&command_dict)) {
1125 LOG(WARNING) << "Not a command dictionary: " << *command;
1126 continue;
1127 }
1128 PublishCommand(*command_dict);
1129 }
1130 }
1131
PublishCommand(const base::DictionaryValue & command)1132 void DeviceRegistrationInfo::PublishCommand(
1133 const base::DictionaryValue& command) {
1134 std::string command_id;
1135 ErrorPtr error;
1136 auto command_instance = component_manager_->ParseCommandInstance(
1137 command, Command::Origin::kCloud, UserRole::kOwner, &command_id, &error);
1138 if (!command_instance) {
1139 LOG(WARNING) << "Failed to parse a command instance: " << command;
1140 if (!command_id.empty())
1141 NotifyCommandAborted(command_id, std::move(error));
1142 return;
1143 }
1144
1145 // TODO(antonm): Properly process cancellation of commands.
1146 if (!component_manager_->FindCommand(command_instance->GetID())) {
1147 LOG(INFO) << "New command '" << command_instance->GetName()
1148 << "' arrived, ID: " << command_instance->GetID();
1149 std::unique_ptr<BackoffEntry> backoff_entry{
1150 new BackoffEntry{cloud_backoff_policy_.get()}};
1151 std::unique_ptr<CloudCommandProxy> cloud_proxy{
1152 new CloudCommandProxy{command_instance.get(), this, component_manager_,
1153 std::move(backoff_entry), task_runner_}};
1154 // CloudCommandProxy::CloudCommandProxy() subscribe itself to Command
1155 // notifications. When Command is being destroyed it sends
1156 // ::OnCommandDestroyed() and CloudCommandProxy deletes itself.
1157 cloud_proxy.release();
1158 component_manager_->AddCommand(std::move(command_instance));
1159 }
1160 }
1161
PublishStateUpdates()1162 void DeviceRegistrationInfo::PublishStateUpdates() {
1163 // If we have pending state update requests, don't send any more for now.
1164 if (device_state_update_pending_)
1165 return;
1166
1167 auto snapshot = component_manager_->GetAndClearRecordedStateChanges();
1168 if (snapshot.state_changes.empty())
1169 return;
1170
1171 std::unique_ptr<base::ListValue> patches{new base::ListValue};
1172 for (auto& state_change : snapshot.state_changes) {
1173 std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
1174 patch->SetString("timeMs",
1175 std::to_string(state_change.timestamp.ToJavaTime()));
1176 patch->SetString("component", state_change.component);
1177 patch->Set("patch", state_change.changed_properties.release());
1178 patches->Append(patch.release());
1179 }
1180
1181 base::DictionaryValue body;
1182 body.SetString("requestTimeMs",
1183 std::to_string(base::Time::Now().ToJavaTime()));
1184 body.Set("patches", patches.release());
1185
1186 device_state_update_pending_ = true;
1187 DoCloudRequest(HttpClient::Method::kPost, GetDeviceURL("patchState"), &body,
1188 base::Bind(&DeviceRegistrationInfo::OnPublishStateDone,
1189 AsWeakPtr(), snapshot.update_id));
1190 }
1191
OnPublishStateDone(ComponentManager::UpdateID update_id,const base::DictionaryValue & reply,ErrorPtr error)1192 void DeviceRegistrationInfo::OnPublishStateDone(
1193 ComponentManager::UpdateID update_id,
1194 const base::DictionaryValue& reply,
1195 ErrorPtr error) {
1196 device_state_update_pending_ = false;
1197 if (error) {
1198 LOG(ERROR) << "Permanent failure while trying to update device state";
1199 return;
1200 }
1201 component_manager_->NotifyStateUpdatedOnServer(update_id);
1202 // See if there were more pending state updates since the previous request
1203 // had been sent out.
1204 PublishStateUpdates();
1205 }
1206
SetGcdState(GcdState new_state)1207 void DeviceRegistrationInfo::SetGcdState(GcdState new_state) {
1208 VLOG_IF(1, new_state != gcd_state_) << "Changing registration status to "
1209 << EnumToString(new_state);
1210 gcd_state_ = new_state;
1211 for (const auto& cb : gcd_state_changed_callbacks_)
1212 cb.Run(gcd_state_);
1213 }
1214
OnTraitDefsChanged()1215 void DeviceRegistrationInfo::OnTraitDefsChanged() {
1216 VLOG(1) << "CommandDefinitionChanged notification received";
1217 if (!HaveRegistrationCredentials() || !connected_to_cloud_)
1218 return;
1219
1220 UpdateDeviceResource(base::Bind(&IgnoreCloudError));
1221 }
1222
OnStateChanged()1223 void DeviceRegistrationInfo::OnStateChanged() {
1224 VLOG(1) << "StateChanged notification received";
1225 if (!HaveRegistrationCredentials() || !connected_to_cloud_)
1226 return;
1227
1228 // TODO(vitalybuka): Integrate BackoffEntry.
1229 PublishStateUpdates();
1230 }
1231
OnComponentTreeChanged()1232 void DeviceRegistrationInfo::OnComponentTreeChanged() {
1233 VLOG(1) << "ComponentTreeChanged notification received";
1234 if (!HaveRegistrationCredentials() || !connected_to_cloud_)
1235 return;
1236
1237 UpdateDeviceResource(base::Bind(&IgnoreCloudError));
1238 }
1239
OnConnected(const std::string & channel_name)1240 void DeviceRegistrationInfo::OnConnected(const std::string& channel_name) {
1241 LOG(INFO) << "Notification channel successfully established over "
1242 << channel_name;
1243 CHECK_EQ(primary_notification_channel_->GetName(), channel_name);
1244 notification_channel_starting_ = false;
1245 pull_channel_->UpdatePullInterval(
1246 base::TimeDelta::FromMinutes(kBackupPollingPeriodMinutes));
1247 current_notification_channel_ = primary_notification_channel_.get();
1248
1249 // If we have not successfully connected to the cloud server and we have not
1250 // initiated the first device resource update, there is nothing we need to
1251 // do now to update the server of the notification channel change.
1252 if (!connected_to_cloud_ && in_progress_resource_update_callbacks_.empty())
1253 return;
1254
1255 // Once we update the device resource with the new notification channel,
1256 // do the last poll for commands from the server, to make sure we have the
1257 // latest command baseline and no other commands have been queued between
1258 // the moment of the last poll and the time we successfully told the server
1259 // to send new commands over the new notification channel.
1260 UpdateDeviceResource(
1261 base::Bind(&IgnoreCloudErrorWithCallback,
1262 base::Bind(&DeviceRegistrationInfo::FetchAndPublishCommands,
1263 AsWeakPtr(), fetch_reason::kRegularPull)));
1264 }
1265
OnDisconnected()1266 void DeviceRegistrationInfo::OnDisconnected() {
1267 LOG(INFO) << "Notification channel disconnected";
1268 if (!HaveRegistrationCredentials() || !connected_to_cloud_)
1269 return;
1270
1271 pull_channel_->UpdatePullInterval(
1272 base::TimeDelta::FromSeconds(kPollingPeriodSeconds));
1273 current_notification_channel_ = pull_channel_.get();
1274 UpdateDeviceResource(base::Bind(&IgnoreCloudError));
1275 }
1276
OnPermanentFailure()1277 void DeviceRegistrationInfo::OnPermanentFailure() {
1278 LOG(ERROR) << "Failed to establish notification channel.";
1279 notification_channel_starting_ = false;
1280 RefreshAccessToken(
1281 base::Bind(&DeviceRegistrationInfo::CheckAccessTokenError, AsWeakPtr()));
1282 }
1283
OnCommandCreated(const base::DictionaryValue & command,const std::string & channel_name)1284 void DeviceRegistrationInfo::OnCommandCreated(
1285 const base::DictionaryValue& command,
1286 const std::string& channel_name) {
1287 if (!connected_to_cloud_)
1288 return;
1289
1290 VLOG(1) << "Command notification received: " << command;
1291
1292 if (!command.empty()) {
1293 // GCD spec indicates that the command parameter in notification object
1294 // "may be empty if command size is too big".
1295 PublishCommand(command);
1296 return;
1297 }
1298
1299 // If this request comes from a Pull channel while the primary notification
1300 // channel (XMPP) is active, we are doing a backup poll, so mark the request
1301 // appropriately.
1302 bool just_in_case =
1303 (channel_name == kPullChannelName) &&
1304 (current_notification_channel_ == primary_notification_channel_.get());
1305
1306 std::string reason =
1307 just_in_case ? fetch_reason::kJustInCase : fetch_reason::kNewCommand;
1308
1309 // If the command was too big to be delivered over a notification channel,
1310 // or OnCommandCreated() was initiated from the Pull notification,
1311 // perform a manual command fetch from the server here.
1312 FetchAndPublishCommands(reason);
1313 }
1314
OnDeviceDeleted(const std::string & cloud_id)1315 void DeviceRegistrationInfo::OnDeviceDeleted(const std::string& cloud_id) {
1316 if (cloud_id != GetSettings().cloud_id) {
1317 LOG(WARNING) << "Unexpected device deletion notification for cloud ID '"
1318 << cloud_id << "'";
1319 return;
1320 }
1321 RemoveCredentials();
1322 }
1323
RemoveCredentials()1324 void DeviceRegistrationInfo::RemoveCredentials() {
1325 if (!HaveRegistrationCredentials())
1326 return;
1327
1328 connected_to_cloud_ = false;
1329
1330 LOG(INFO) << "Device is unregistered from the cloud. Deleting credentials";
1331 if (auth_manager_)
1332 auth_manager_->SetAuthSecret({}, RootClientTokenOwner::kNone);
1333
1334 Config::Transaction change{config_};
1335 // Keep cloud_id to switch to detect kInvalidCredentials after restart.
1336 change.set_robot_account("");
1337 change.set_refresh_token("");
1338 change.Commit();
1339
1340 current_notification_channel_ = nullptr;
1341 if (primary_notification_channel_) {
1342 primary_notification_channel_->Stop();
1343 primary_notification_channel_.reset();
1344 }
1345 if (pull_channel_) {
1346 pull_channel_->Stop();
1347 pull_channel_.reset();
1348 }
1349 notification_channel_starting_ = false;
1350 SetGcdState(GcdState::kInvalidCredentials);
1351 }
1352
1353 } // namespace weave
1354