/* * Copyright 2023 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "mmc/daemon/service.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common/message_loop_thread.h" #include "mmc/codec_server/hfp_lc3_mmc_decoder.h" #include "mmc/codec_server/hfp_lc3_mmc_encoder.h" #include "mmc/daemon/constants.h" #include "mmc/mmc_interface/mmc_interface.h" #include "mmc/proto/mmc_service.pb.h" #if !defined(EXCLUDE_NONSTANDARD_CODECS) #include "mmc/codec_server/a2dp_aac_mmc_encoder.h" #endif namespace mmc { namespace { using namespace bluetooth; // Task that would run on the thread. void StartSocketListener(int fd, struct sockaddr_un addr, std::promise task_ended, std::unique_ptr codec_server) { socklen_t addr_size = sizeof(struct sockaddr_un); int client_fd = accept(fd, (struct sockaddr*)&addr, &addr_size); // |fd| is only used for accept. close(fd); if (client_fd < 0) { log::error("Failed to accept: {}", strerror(errno)); codec_server.release(); task_ended.set_value(); return; } std::array i_buf = {}; std::array o_buf = {}; struct pollfd pfd; pfd.fd = client_fd; pfd.events = POLLIN; while (1) { // Blocking poll. int poll_ret = poll(&pfd, 1, -1); if (poll_ret <= 0) { log::error("Poll failed: {}", strerror(errno)); break; } // Ignore remaining data in the closed socket. if (pfd.revents & (POLLHUP | POLLNVAL)) { log::info("Socket disconnected"); break; } int i_data_len = recv(client_fd, i_buf.data(), kMaximumBufferSize, MSG_NOSIGNAL); if (i_data_len <= 0) { log::error("Failed to recv data: {}", strerror(errno)); break; } // Start transcode. int o_data_len = codec_server->transcode(i_buf.data(), i_data_len, o_buf.data(), kMaximumBufferSize); if (o_data_len < 0) { log::error("Failed to transcode: {}", strerror(-o_data_len)); break; } int sent_rc = send(client_fd, o_buf.data(), o_data_len, MSG_NOSIGNAL); if (sent_rc <= 0) { log::error("Failed to send data: {}", strerror(errno)); break; } o_buf.fill(0); } close(client_fd); unlink(addr.sun_path); codec_server.release(); task_ended.set_value(); return; } } // namespace Service::Service(base::OnceClosure shutdown_callback) : shutdown_callback_(std::move(shutdown_callback)), weak_ptr_factory_(this) {} bool Service::Init() { // Set up the dbus service. dbus::Bus::Options opts; opts.bus_type = dbus::Bus::SYSTEM; bus_ = new dbus::Bus(std::move(opts)); if (!bus_->Connect()) { log::error("Failed to connect to system bus"); return false; } exported_object_ = bus_->GetExportedObject(dbus::ObjectPath(kMmcServicePath)); if (!exported_object_) { log::error("Failed to export {} object", kMmcServicePath); return false; } using ServiceMethod = void (Service::*)(dbus::MethodCall*, dbus::ExportedObject::ResponseSender); const std::map kServiceMethods = { {kCodecInitMethod, &Service::CodecInit}, {kCodecCleanUpMethod, &Service::CodecCleanUp}, }; for (const auto& iter : kServiceMethods) { bool ret = exported_object_->ExportMethodAndBlock( kMmcServiceInterface, iter.first, base::BindRepeating(iter.second, weak_ptr_factory_.GetWeakPtr())); if (!ret) { log::error("Failed to export method: {}", iter.first); return false; } } if (!bus_->RequestOwnershipAndBlock(kMmcServiceName, dbus::Bus::REQUIRE_PRIMARY)) { log::error("Failed to take ownership of {}", kMmcServiceName); return false; } return true; } void Service::CodecInit(dbus::MethodCall* method_call, dbus::ExportedObject::ResponseSender sender) { dbus::MessageReader reader(method_call); auto dbus_response = dbus::Response::FromMethodCall(method_call); dbus::MessageWriter writer(dbus_response.get()); CodecInitRequest request; CodecInitResponse response; if (!reader.PopArrayOfBytesAsProto(&request)) { std::move(sender).Run(dbus::ErrorResponse::FromMethodCall( method_call, kMmcServiceError, "Unable to parse CodecInitRequest from message")); return; } if (!request.has_config()) { std::move(sender).Run(dbus::ErrorResponse::FromMethodCall( method_call, kMmcServiceError, "'Config Param' must be set")); return; } // Create codec server instance. std::unique_ptr codec_server; if (request.config().has_hfp_lc3_decoder_param()) { codec_server = std::make_unique(); } else if (request.config().has_hfp_lc3_encoder_param()) { codec_server = std::make_unique(); } #if !defined(EXCLUDE_NONSTANDARD_CODECS) else if (request.config().has_a2dp_aac_encoder_param()) { codec_server = std::make_unique(); } #endif else { std::move(sender).Run(dbus::ErrorResponse::FromMethodCall( method_call, kMmcServiceError, "Codec type must be specified")); return; } int frame_size = codec_server->init(request.config()); if (frame_size < 0) { std::move(sender).Run(dbus::ErrorResponse::FromMethodCall( method_call, kMmcServiceError, "Init codec server failed: " + std::string(strerror(-frame_size)))); return; } response.set_input_frame_size(frame_size); // Generate socket name for client. std::string socket_path = std::string(kMmcSocketName) + base::UnguessableToken::Create().ToString(); response.set_socket_token(socket_path); int skt_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0); if (skt_fd < 0) { std::move(sender).Run(dbus::ErrorResponse::FromMethodCall( method_call, kMmcServiceError, "Create socket failed: " + std::string(strerror(errno)))); return; } struct sockaddr_un addr = {}; addr.sun_family = AF_UNIX; strncpy(addr.sun_path, response.socket_token().c_str(), sizeof(addr.sun_path) - 1); unlink(addr.sun_path); if (bind(skt_fd, (struct sockaddr*)&addr, sizeof(struct sockaddr_un)) == -1) { std::move(sender).Run(dbus::ErrorResponse::FromMethodCall( method_call, kMmcServiceError, "Bind socket failed: " + std::string(strerror(errno)))); return; } // mmc_service group can read/write the socket. int rc = chmod(addr.sun_path, 0770); if (rc < 0) { std::move(sender).Run(dbus::ErrorResponse::FromMethodCall( method_call, kMmcServiceError, "Chmod socket failed: " + std::string(strerror(errno)))); return; } if (listen(skt_fd, kClientMaximum) == -1) { std::move(sender).Run(dbus::ErrorResponse::FromMethodCall( method_call, kMmcServiceError, "Listen socket failed: " + std::string(strerror(errno)))); return; } // Create a thread and pass codec server and socket fd to it. if (!StartWorkerThread(skt_fd, std::move(addr), std::move(codec_server))) { std::move(sender).Run(dbus::ErrorResponse::FromMethodCall( method_call, kMmcServiceError, "No free thread available")); return; } writer.AppendProtoAsArrayOfBytes(response); std::move(sender).Run(std::move(dbus_response)); return; } void Service::CodecCleanUp(dbus::MethodCall* method_call, dbus::ExportedObject::ResponseSender sender) { auto dbus_response = dbus::Response::FromMethodCall(method_call); RemoveIdleThread(); std::move(sender).Run(std::move(dbus_response)); return; } bool Service::StartWorkerThread(int fd, struct sockaddr_un addr, std::unique_ptr codec_server) { // Each thread has its associated future to indicate task completion. std::promise task_ended; thread_pool_.push_back(std::make_pair( std::make_unique(kWorkerThreadName), std::make_unique>(task_ended.get_future()))); // Start up thread and assign task to it. thread_pool_.back().first->StartUp(); if (!thread_pool_.back().first->IsRunning()) { log::error("Failed to start thread"); return false; } // Real-time scheduling increases thread priority. // Without it, the thread still works. if (!thread_pool_.back().first->EnableRealTimeScheduling()) { log::warn("Failed to enable real time scheduling"); } if (!thread_pool_.back().first->DoInThread( FROM_HERE, base::BindOnce(&StartSocketListener, fd, std::move(addr), std::move(task_ended), std::move(codec_server)))) { log::error("Failed to run task"); return false; } return true; } void Service::RemoveIdleThread() { for (auto thread = thread_pool_.begin(); thread != thread_pool_.end();) { if (thread->second->wait_for(std::chrono::milliseconds( kThreadCheckTimeout)) == std::future_status::ready) { // The task is over, close the thread and remove it from the thread pool. thread->first->ShutDown(); thread = thread_pool_.erase(thread); } else { thread++; } } } } // namespace mmc