1 /*
2  * Copyright 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "mmc/daemon/service.h"
18 
19 #include <base/functional/bind.h>
20 #include <base/functional/callback_helpers.h>
21 #include <base/stl_util.h>
22 #include <base/task/single_thread_task_runner.h>
23 #include <base/unguessable_token.h>
24 #include <bluetooth/log.h>
25 #include <poll.h>
26 #include <sys/socket.h>
27 #include <sys/stat.h>
28 #include <sys/un.h>
29 #include <unistd.h>
30 
31 #include <cerrno>
32 #include <cstring>
33 #include <future>
34 
35 #include "common/message_loop_thread.h"
36 #include "mmc/codec_server/hfp_lc3_mmc_decoder.h"
37 #include "mmc/codec_server/hfp_lc3_mmc_encoder.h"
38 #include "mmc/daemon/constants.h"
39 #include "mmc/mmc_interface/mmc_interface.h"
40 #include "mmc/proto/mmc_service.pb.h"
41 
42 #if !defined(EXCLUDE_NONSTANDARD_CODECS)
43 #include "mmc/codec_server/a2dp_aac_mmc_encoder.h"
44 #endif
45 
46 namespace mmc {
47 namespace {
48 
49 using namespace bluetooth;
50 
51 // Task that would run on the thread.
StartSocketListener(int fd,struct sockaddr_un addr,std::promise<void> task_ended,std::unique_ptr<MmcInterface> codec_server)52 void StartSocketListener(int fd, struct sockaddr_un addr,
53                          std::promise<void> task_ended,
54                          std::unique_ptr<MmcInterface> codec_server) {
55   socklen_t addr_size = sizeof(struct sockaddr_un);
56   int client_fd = accept(fd, (struct sockaddr*)&addr, &addr_size);
57   // |fd| is only used for accept.
58   close(fd);
59 
60   if (client_fd < 0) {
61     log::error("Failed to accept: {}", strerror(errno));
62     codec_server.release();
63     task_ended.set_value();
64     return;
65   }
66 
67   std::array<uint8_t, kMaximumBufferSize> i_buf = {};
68   std::array<uint8_t, kMaximumBufferSize> o_buf = {};
69 
70   struct pollfd pfd;
71   pfd.fd = client_fd;
72   pfd.events = POLLIN;
73 
74   while (1) {
75     // Blocking poll.
76     int poll_ret = poll(&pfd, 1, -1);
77     if (poll_ret <= 0) {
78       log::error("Poll failed: {}", strerror(errno));
79       break;
80     }
81 
82     // Ignore remaining data in the closed socket.
83     if (pfd.revents & (POLLHUP | POLLNVAL)) {
84       log::info("Socket disconnected");
85       break;
86     }
87 
88     int i_data_len =
89         recv(client_fd, i_buf.data(), kMaximumBufferSize, MSG_NOSIGNAL);
90     if (i_data_len <= 0) {
91       log::error("Failed to recv data: {}", strerror(errno));
92       break;
93     }
94 
95     // Start transcode.
96     int o_data_len = codec_server->transcode(i_buf.data(), i_data_len,
97                                              o_buf.data(), kMaximumBufferSize);
98     if (o_data_len < 0) {
99       log::error("Failed to transcode: {}", strerror(-o_data_len));
100       break;
101     }
102 
103     int sent_rc = send(client_fd, o_buf.data(), o_data_len, MSG_NOSIGNAL);
104     if (sent_rc <= 0) {
105       log::error("Failed to send data: {}", strerror(errno));
106       break;
107     }
108     o_buf.fill(0);
109   }
110   close(client_fd);
111   unlink(addr.sun_path);
112   codec_server.release();
113   task_ended.set_value();
114   return;
115 }
116 
117 }  // namespace
118 
Service(base::OnceClosure shutdown_callback)119 Service::Service(base::OnceClosure shutdown_callback)
120     : shutdown_callback_(std::move(shutdown_callback)),
121       weak_ptr_factory_(this) {}
122 
Init()123 bool Service::Init() {
124   // Set up the dbus service.
125   dbus::Bus::Options opts;
126   opts.bus_type = dbus::Bus::SYSTEM;
127   bus_ = new dbus::Bus(std::move(opts));
128 
129   if (!bus_->Connect()) {
130     log::error("Failed to connect to system bus");
131     return false;
132   }
133 
134   exported_object_ = bus_->GetExportedObject(dbus::ObjectPath(kMmcServicePath));
135   if (!exported_object_) {
136     log::error("Failed to export {} object", kMmcServicePath);
137     return false;
138   }
139 
140   using ServiceMethod = void (Service::*)(dbus::MethodCall*,
141                                           dbus::ExportedObject::ResponseSender);
142   const std::map<const char*, ServiceMethod> kServiceMethods = {
143       {kCodecInitMethod, &Service::CodecInit},
144       {kCodecCleanUpMethod, &Service::CodecCleanUp},
145   };
146 
147   for (const auto& iter : kServiceMethods) {
148     bool ret = exported_object_->ExportMethodAndBlock(
149         kMmcServiceInterface, iter.first,
150         base::BindRepeating(iter.second, weak_ptr_factory_.GetWeakPtr()));
151     if (!ret) {
152       log::error("Failed to export method: {}", iter.first);
153       return false;
154     }
155   }
156 
157   if (!bus_->RequestOwnershipAndBlock(kMmcServiceName,
158                                       dbus::Bus::REQUIRE_PRIMARY)) {
159     log::error("Failed to take ownership of {}", kMmcServiceName);
160     return false;
161   }
162   return true;
163 }
164 
CodecInit(dbus::MethodCall * method_call,dbus::ExportedObject::ResponseSender sender)165 void Service::CodecInit(dbus::MethodCall* method_call,
166                         dbus::ExportedObject::ResponseSender sender) {
167   dbus::MessageReader reader(method_call);
168   auto dbus_response = dbus::Response::FromMethodCall(method_call);
169 
170   dbus::MessageWriter writer(dbus_response.get());
171 
172   CodecInitRequest request;
173   CodecInitResponse response;
174 
175   if (!reader.PopArrayOfBytesAsProto(&request)) {
176     std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
177         method_call, kMmcServiceError,
178         "Unable to parse CodecInitRequest from message"));
179     return;
180   }
181 
182   if (!request.has_config()) {
183     std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
184         method_call, kMmcServiceError, "'Config Param' must be set"));
185     return;
186   }
187 
188   // Create codec server instance.
189   std::unique_ptr<MmcInterface> codec_server;
190   if (request.config().has_hfp_lc3_decoder_param()) {
191     codec_server = std::make_unique<HfpLc3Decoder>();
192   } else if (request.config().has_hfp_lc3_encoder_param()) {
193     codec_server = std::make_unique<HfpLc3Encoder>();
194   }
195 #if !defined(EXCLUDE_NONSTANDARD_CODECS)
196   else if (request.config().has_a2dp_aac_encoder_param()) {
197     codec_server = std::make_unique<A2dpAacEncoder>();
198   }
199 #endif
200   else {
201     std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
202         method_call, kMmcServiceError, "Codec type must be specified"));
203     return;
204   }
205 
206   int frame_size = codec_server->init(request.config());
207   if (frame_size < 0) {
208     std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
209         method_call, kMmcServiceError,
210         "Init codec server failed: " + std::string(strerror(-frame_size))));
211     return;
212   }
213   response.set_input_frame_size(frame_size);
214 
215   // Generate socket name for client.
216   std::string socket_path =
217       std::string(kMmcSocketName) + base::UnguessableToken::Create().ToString();
218   response.set_socket_token(socket_path);
219 
220   int skt_fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
221   if (skt_fd < 0) {
222     std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
223         method_call, kMmcServiceError,
224         "Create socket failed: " + std::string(strerror(errno))));
225     return;
226   }
227 
228   struct sockaddr_un addr = {};
229   addr.sun_family = AF_UNIX;
230   strncpy(addr.sun_path, response.socket_token().c_str(),
231           sizeof(addr.sun_path) - 1);
232   unlink(addr.sun_path);
233 
234   if (bind(skt_fd, (struct sockaddr*)&addr, sizeof(struct sockaddr_un)) == -1) {
235     std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
236         method_call, kMmcServiceError,
237         "Bind socket failed: " + std::string(strerror(errno))));
238     return;
239   }
240 
241   // mmc_service group can read/write the socket.
242   int rc = chmod(addr.sun_path, 0770);
243   if (rc < 0) {
244     std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
245         method_call, kMmcServiceError,
246         "Chmod socket failed: " + std::string(strerror(errno))));
247     return;
248   }
249 
250   if (listen(skt_fd, kClientMaximum) == -1) {
251     std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
252         method_call, kMmcServiceError,
253         "Listen socket failed: " + std::string(strerror(errno))));
254     return;
255   }
256 
257   // Create a thread and pass codec server and socket fd to it.
258   if (!StartWorkerThread(skt_fd, std::move(addr), std::move(codec_server))) {
259     std::move(sender).Run(dbus::ErrorResponse::FromMethodCall(
260         method_call, kMmcServiceError, "No free thread available"));
261     return;
262   }
263 
264   writer.AppendProtoAsArrayOfBytes(response);
265   std::move(sender).Run(std::move(dbus_response));
266   return;
267 }
268 
CodecCleanUp(dbus::MethodCall * method_call,dbus::ExportedObject::ResponseSender sender)269 void Service::CodecCleanUp(dbus::MethodCall* method_call,
270                            dbus::ExportedObject::ResponseSender sender) {
271   auto dbus_response = dbus::Response::FromMethodCall(method_call);
272   RemoveIdleThread();
273   std::move(sender).Run(std::move(dbus_response));
274   return;
275 }
276 
StartWorkerThread(int fd,struct sockaddr_un addr,std::unique_ptr<MmcInterface> codec_server)277 bool Service::StartWorkerThread(int fd, struct sockaddr_un addr,
278                                 std::unique_ptr<MmcInterface> codec_server) {
279   // Each thread has its associated future to indicate task completion.
280   std::promise<void> task_ended;
281   thread_pool_.push_back(std::make_pair(
282       std::make_unique<bluetooth::common::MessageLoopThread>(kWorkerThreadName),
283       std::make_unique<std::future<void>>(task_ended.get_future())));
284 
285   // Start up thread and assign task to it.
286   thread_pool_.back().first->StartUp();
287   if (!thread_pool_.back().first->IsRunning()) {
288     log::error("Failed to start thread");
289     return false;
290   }
291 
292   // Real-time scheduling increases thread priority.
293   // Without it, the thread still works.
294   if (!thread_pool_.back().first->EnableRealTimeScheduling()) {
295     log::warn("Failed to enable real time scheduling");
296   }
297 
298   if (!thread_pool_.back().first->DoInThread(
299           FROM_HERE,
300           base::BindOnce(&StartSocketListener, fd, std::move(addr),
301                          std::move(task_ended), std::move(codec_server)))) {
302     log::error("Failed to run task");
303     return false;
304   }
305 
306   return true;
307 }
308 
RemoveIdleThread()309 void Service::RemoveIdleThread() {
310   for (auto thread = thread_pool_.begin(); thread != thread_pool_.end();) {
311     if (thread->second->wait_for(std::chrono::milliseconds(
312             kThreadCheckTimeout)) == std::future_status::ready) {
313       // The task is over, close the thread and remove it from the thread pool.
314       thread->first->ShutDown();
315       thread = thread_pool_.erase(thread);
316     } else {
317       thread++;
318     }
319   }
320 }
321 
322 }  // namespace mmc
323