1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "lights_observer.h"
17
18 #include <android-base/logging.h>
19 #include <chrono>
20 #include "common/libs/utils/vsock_connection.h"
21
22 #include <json/json.h>
23
24 namespace cuttlefish {
25 namespace webrtc_streaming {
26
LightsObserver(unsigned int port,unsigned int cid,bool vhost_user_vsock)27 LightsObserver::LightsObserver(unsigned int port, unsigned int cid,
28 bool vhost_user_vsock)
29 : cid_(cid),
30 port_(port),
31 vhost_user_vsock_(vhost_user_vsock),
32 is_running_(false),
33 session_active_(false),
34 last_client_channel_id_(-1) {}
35
~LightsObserver()36 LightsObserver::~LightsObserver() { Stop(); }
37
Start()38 bool LightsObserver::Start() {
39 if (connection_thread_.joinable()) {
40 LOG(ERROR) << "Connection thread is already running.";
41 return false;
42 }
43
44 is_running_ = true;
45
46 connection_thread_ = std::thread([this] {
47 while (is_running_) {
48 while (cvd_connection_.IsConnected()) {
49 ReadServerMessages();
50 }
51
52 // Try to start a new connection. If this fails, delay retrying a bit.
53 if (is_running_ &&
54 !cvd_connection_.Connect(
55 port_, cid_,
56 vhost_user_vsock_
57 ? std::optional(0) /* any value is okay for client */
58 : std::nullopt)) {
59 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
60 continue;
61 }
62 }
63
64 LOG(INFO) << "Exiting connection thread";
65 });
66
67 LOG(INFO) << "Connection thread running";
68 return true;
69 }
70
Stop()71 void LightsObserver::Stop() {
72 is_running_ = false;
73 cvd_connection_.Disconnect();
74
75 // The connection_thread_ should finish at any point now. Let's join it.
76 if (connection_thread_.joinable()) {
77 connection_thread_.join();
78 }
79 }
80
ReadServerMessages()81 void LightsObserver::ReadServerMessages() {
82 static constexpr auto kEventKey = "event";
83 static constexpr auto kMessageStart = "VIRTUAL_DEVICE_START_LIGHTS_SESSION";
84 static constexpr auto kMessageStop = "VIRTUAL_DEVICE_STOP_LIGHTS_SESSION";
85 static constexpr auto kMessageUpdate = "VIRTUAL_DEVICE_LIGHTS_UPDATE";
86
87 auto json_value = cvd_connection_.ReadJsonMessage();
88
89 if (json_value[kEventKey] == kMessageStart) {
90 session_active_ = true;
91 } else if (json_value[kEventKey] == kMessageStop) {
92 session_active_ = false;
93 } else if (json_value[kEventKey] == kMessageUpdate && session_active_) {
94 // Cache the latest update for when new clients register
95 std::lock_guard<std::mutex> lock(clients_lock_);
96 cached_latest_update_ = json_value;
97
98 // Send update to all subscribed clients
99 for (auto itr = client_message_senders_.begin();
100 itr != client_message_senders_.end(); itr++) {
101 itr->second(json_value);
102 }
103 }
104 }
105
Subscribe(std::function<bool (const Json::Value &)> lights_message_sender)106 int LightsObserver::Subscribe(
107 std::function<bool(const Json::Value&)> lights_message_sender) {
108 int client_id = -1;
109 {
110 std::lock_guard<std::mutex> lock(clients_lock_);
111 client_id = ++last_client_channel_id_;
112 client_message_senders_[client_id] = lights_message_sender;
113
114 if (!cached_latest_update_.empty()) {
115 lights_message_sender(cached_latest_update_);
116 }
117 }
118
119 return client_id;
120 }
121
Unsubscribe(int id)122 void LightsObserver::Unsubscribe(int id) {
123 std::lock_guard<std::mutex> lock(clients_lock_);
124 client_message_senders_.erase(id);
125 }
126
127 } // namespace webrtc_streaming
128 } // namespace cuttlefish
129