1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "lights_observer.h"
17 
18 #include <android-base/logging.h>
19 #include <chrono>
20 #include "common/libs/utils/vsock_connection.h"
21 
22 #include <json/json.h>
23 
24 namespace cuttlefish {
25 namespace webrtc_streaming {
26 
LightsObserver(unsigned int port,unsigned int cid,bool vhost_user_vsock)27 LightsObserver::LightsObserver(unsigned int port, unsigned int cid,
28                                bool vhost_user_vsock)
29     : cid_(cid),
30       port_(port),
31       vhost_user_vsock_(vhost_user_vsock),
32       is_running_(false),
33       session_active_(false),
34       last_client_channel_id_(-1) {}
35 
~LightsObserver()36 LightsObserver::~LightsObserver() { Stop(); }
37 
Start()38 bool LightsObserver::Start() {
39   if (connection_thread_.joinable()) {
40     LOG(ERROR) << "Connection thread is already running.";
41     return false;
42   }
43 
44   is_running_ = true;
45 
46   connection_thread_ = std::thread([this] {
47     while (is_running_) {
48       while (cvd_connection_.IsConnected()) {
49         ReadServerMessages();
50       }
51 
52       // Try to start a new connection. If this fails, delay retrying a bit.
53       if (is_running_ &&
54           !cvd_connection_.Connect(
55               port_, cid_,
56               vhost_user_vsock_
57                   ? std::optional(0) /* any value is okay for client */
58                   : std::nullopt)) {
59         std::this_thread::sleep_for(std::chrono::milliseconds(1000));
60         continue;
61       }
62     }
63 
64     LOG(INFO) << "Exiting connection thread";
65   });
66 
67   LOG(INFO) << "Connection thread running";
68   return true;
69 }
70 
Stop()71 void LightsObserver::Stop() {
72   is_running_ = false;
73   cvd_connection_.Disconnect();
74 
75   // The connection_thread_ should finish at any point now. Let's join it.
76   if (connection_thread_.joinable()) {
77     connection_thread_.join();
78   }
79 }
80 
ReadServerMessages()81 void LightsObserver::ReadServerMessages() {
82   static constexpr auto kEventKey = "event";
83   static constexpr auto kMessageStart = "VIRTUAL_DEVICE_START_LIGHTS_SESSION";
84   static constexpr auto kMessageStop = "VIRTUAL_DEVICE_STOP_LIGHTS_SESSION";
85   static constexpr auto kMessageUpdate = "VIRTUAL_DEVICE_LIGHTS_UPDATE";
86 
87   auto json_value = cvd_connection_.ReadJsonMessage();
88 
89   if (json_value[kEventKey] == kMessageStart) {
90     session_active_ = true;
91   } else if (json_value[kEventKey] == kMessageStop) {
92     session_active_ = false;
93   } else if (json_value[kEventKey] == kMessageUpdate && session_active_) {
94     // Cache the latest update for when new clients register
95     std::lock_guard<std::mutex> lock(clients_lock_);
96     cached_latest_update_ = json_value;
97 
98     // Send update to all subscribed clients
99     for (auto itr = client_message_senders_.begin();
100          itr != client_message_senders_.end(); itr++) {
101       itr->second(json_value);
102     }
103   }
104 }
105 
Subscribe(std::function<bool (const Json::Value &)> lights_message_sender)106 int LightsObserver::Subscribe(
107     std::function<bool(const Json::Value&)> lights_message_sender) {
108   int client_id = -1;
109   {
110     std::lock_guard<std::mutex> lock(clients_lock_);
111     client_id = ++last_client_channel_id_;
112     client_message_senders_[client_id] = lights_message_sender;
113 
114     if (!cached_latest_update_.empty()) {
115       lights_message_sender(cached_latest_update_);
116     }
117   }
118 
119   return client_id;
120 }
121 
Unsubscribe(int id)122 void LightsObserver::Unsubscribe(int id) {
123   std::lock_guard<std::mutex> lock(clients_lock_);
124   client_message_senders_.erase(id);
125 }
126 
127 }  // namespace webrtc_streaming
128 }  // namespace cuttlefish
129