/* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "lights_observer.h" #include #include #include "common/libs/utils/vsock_connection.h" #include namespace cuttlefish { namespace webrtc_streaming { LightsObserver::LightsObserver(unsigned int port, unsigned int cid, bool vhost_user_vsock) : cid_(cid), port_(port), vhost_user_vsock_(vhost_user_vsock), is_running_(false), session_active_(false), last_client_channel_id_(-1) {} LightsObserver::~LightsObserver() { Stop(); } bool LightsObserver::Start() { if (connection_thread_.joinable()) { LOG(ERROR) << "Connection thread is already running."; return false; } is_running_ = true; connection_thread_ = std::thread([this] { while (is_running_) { while (cvd_connection_.IsConnected()) { ReadServerMessages(); } // Try to start a new connection. If this fails, delay retrying a bit. if (is_running_ && !cvd_connection_.Connect( port_, cid_, vhost_user_vsock_ ? std::optional(0) /* any value is okay for client */ : std::nullopt)) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); continue; } } LOG(INFO) << "Exiting connection thread"; }); LOG(INFO) << "Connection thread running"; return true; } void LightsObserver::Stop() { is_running_ = false; cvd_connection_.Disconnect(); // The connection_thread_ should finish at any point now. Let's join it. if (connection_thread_.joinable()) { connection_thread_.join(); } } void LightsObserver::ReadServerMessages() { static constexpr auto kEventKey = "event"; static constexpr auto kMessageStart = "VIRTUAL_DEVICE_START_LIGHTS_SESSION"; static constexpr auto kMessageStop = "VIRTUAL_DEVICE_STOP_LIGHTS_SESSION"; static constexpr auto kMessageUpdate = "VIRTUAL_DEVICE_LIGHTS_UPDATE"; auto json_value = cvd_connection_.ReadJsonMessage(); if (json_value[kEventKey] == kMessageStart) { session_active_ = true; } else if (json_value[kEventKey] == kMessageStop) { session_active_ = false; } else if (json_value[kEventKey] == kMessageUpdate && session_active_) { // Cache the latest update for when new clients register std::lock_guard lock(clients_lock_); cached_latest_update_ = json_value; // Send update to all subscribed clients for (auto itr = client_message_senders_.begin(); itr != client_message_senders_.end(); itr++) { itr->second(json_value); } } } int LightsObserver::Subscribe( std::function lights_message_sender) { int client_id = -1; { std::lock_guard lock(clients_lock_); client_id = ++last_client_channel_id_; client_message_senders_[client_id] = lights_message_sender; if (!cached_latest_update_.empty()) { lights_message_sender(cached_latest_update_); } } return client_id; } void LightsObserver::Unsubscribe(int id) { std::lock_guard lock(clients_lock_); client_message_senders_.erase(id); } } // namespace webrtc_streaming } // namespace cuttlefish