1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/ipc/client_impl.h"
18 
19 #include <fcntl.h>
20 #include <inttypes.h>
21 
22 #include <utility>
23 
24 #include "perfetto/base/task_runner.h"
25 #include "perfetto/ext/base/unix_socket.h"
26 #include "perfetto/ext/base/utils.h"
27 #include "perfetto/ext/ipc/service_descriptor.h"
28 #include "perfetto/ext/ipc/service_proxy.h"
29 
30 #include "protos/perfetto/ipc/wire_protocol.gen.h"
31 
32 // TODO(primiano): Add ThreadChecker everywhere.
33 
34 // TODO(primiano): Add timeouts.
35 
36 namespace perfetto {
37 namespace ipc {
38 
39 namespace {
40 constexpr base::SockFamily kClientSockFamily =
41     kUseTCPSocket ? base::SockFamily::kInet : base::SockFamily::kUnix;
42 }  // namespace
43 
44 // static
CreateInstance(ConnArgs conn_args,base::TaskRunner * task_runner)45 std::unique_ptr<Client> Client::CreateInstance(ConnArgs conn_args,
46                                                base::TaskRunner* task_runner) {
47   std::unique_ptr<Client> client(
48       new ClientImpl(std::move(conn_args), task_runner));
49   return client;
50 }
51 
ClientImpl(ConnArgs conn_args,base::TaskRunner * task_runner)52 ClientImpl::ClientImpl(ConnArgs conn_args, base::TaskRunner* task_runner)
53     : socket_name_(conn_args.socket_name),
54       socket_retry_(conn_args.retry),
55       task_runner_(task_runner),
56       weak_ptr_factory_(this) {
57   if (conn_args.socket_fd) {
58     // Create the client using a connected socket. This code path will never hit
59     // OnConnect().
60     sock_ = base::UnixSocket::AdoptConnected(
61         std::move(conn_args.socket_fd), this, task_runner_, kClientSockFamily,
62         base::SockType::kStream, base::SockPeerCredMode::kIgnore);
63   } else {
64     // Connect using the socket name.
65     TryConnect();
66   }
67 }
68 
~ClientImpl()69 ClientImpl::~ClientImpl() {
70   // Ensure we are not destroyed in the middle of invoking a reply.
71   PERFETTO_DCHECK(!invoking_method_reply_);
72   OnDisconnect(
73       nullptr);  // The base::UnixSocket* ptr is not used in OnDisconnect().
74 }
75 
TryConnect()76 void ClientImpl::TryConnect() {
77   PERFETTO_DCHECK(socket_name_);
78   sock_ = base::UnixSocket::Connect(socket_name_, this, task_runner_,
79                                     kClientSockFamily, base::SockType::kStream,
80                                     base::SockPeerCredMode::kIgnore);
81 }
82 
BindService(base::WeakPtr<ServiceProxy> service_proxy)83 void ClientImpl::BindService(base::WeakPtr<ServiceProxy> service_proxy) {
84   if (!service_proxy)
85     return;
86   if (!sock_->is_connected()) {
87     queued_bindings_.emplace_back(service_proxy);
88     return;
89   }
90   RequestID request_id = ++last_request_id_;
91   Frame frame;
92   frame.set_request_id(request_id);
93   Frame::BindService* req = frame.mutable_msg_bind_service();
94   const char* const service_name = service_proxy->GetDescriptor().service_name;
95   req->set_service_name(service_name);
96   if (!SendFrame(frame)) {
97     PERFETTO_DLOG("BindService(%s) failed", service_name);
98     return service_proxy->OnConnect(false /* success */);
99   }
100   QueuedRequest qr;
101   qr.type = Frame::kMsgBindServiceFieldNumber;
102   qr.request_id = request_id;
103   qr.service_proxy = service_proxy;
104   queued_requests_.emplace(request_id, std::move(qr));
105 }
106 
UnbindService(ServiceID service_id)107 void ClientImpl::UnbindService(ServiceID service_id) {
108   service_bindings_.erase(service_id);
109 }
110 
BeginInvoke(ServiceID service_id,const std::string & method_name,MethodID remote_method_id,const ProtoMessage & method_args,bool drop_reply,base::WeakPtr<ServiceProxy> service_proxy,int fd)111 RequestID ClientImpl::BeginInvoke(ServiceID service_id,
112                                   const std::string& method_name,
113                                   MethodID remote_method_id,
114                                   const ProtoMessage& method_args,
115                                   bool drop_reply,
116                                   base::WeakPtr<ServiceProxy> service_proxy,
117                                   int fd) {
118   RequestID request_id = ++last_request_id_;
119   Frame frame;
120   frame.set_request_id(request_id);
121   Frame::InvokeMethod* req = frame.mutable_msg_invoke_method();
122   req->set_service_id(service_id);
123   req->set_method_id(remote_method_id);
124   req->set_drop_reply(drop_reply);
125   req->set_args_proto(method_args.SerializeAsString());
126   if (!SendFrame(frame, fd)) {
127     PERFETTO_DLOG("BeginInvoke() failed while sending the frame");
128     return 0;
129   }
130   if (drop_reply)
131     return 0;
132   QueuedRequest qr;
133   qr.type = Frame::kMsgInvokeMethodFieldNumber;
134   qr.request_id = request_id;
135   qr.method_name = method_name;
136   qr.service_proxy = std::move(service_proxy);
137   queued_requests_.emplace(request_id, std::move(qr));
138   return request_id;
139 }
140 
SendFrame(const Frame & frame,int fd)141 bool ClientImpl::SendFrame(const Frame& frame, int fd) {
142   // Serialize the frame into protobuf, add the size header, and send it.
143   std::string buf = BufferedFrameDeserializer::Serialize(frame);
144 
145   // TODO(primiano): this should do non-blocking I/O. But then what if the
146   // socket buffer is full? We might want to either drop the request or throttle
147   // the send and PostTask the reply later? Right now we are making Send()
148   // blocking as a workaround. Propagate bakpressure to the caller instead.
149   bool res = sock_->Send(buf.data(), buf.size(), fd);
150   PERFETTO_CHECK(res || !sock_->is_connected());
151   return res;
152 }
153 
OnConnect(base::UnixSocket *,bool connected)154 void ClientImpl::OnConnect(base::UnixSocket*, bool connected) {
155   if (!connected && socket_retry_) {
156     socket_backoff_ms_ =
157         (socket_backoff_ms_ < 10000) ? socket_backoff_ms_ + 1000 : 30000;
158     PERFETTO_DLOG(
159         "Connection to traced's UNIX socket failed, retrying in %u seconds",
160         socket_backoff_ms_ / 1000);
161     auto weak_this = weak_ptr_factory_.GetWeakPtr();
162     task_runner_->PostDelayedTask(
163         [weak_this] {
164           if (weak_this)
165             static_cast<ClientImpl&>(*weak_this).TryConnect();
166         },
167         socket_backoff_ms_);
168     return;
169   }
170 
171   // Drain the BindService() calls that were queued before establishing the
172   // connection with the host. Note that if we got disconnected, the call to
173   // OnConnect below might delete |this|, so move everything on the stack first.
174   auto queued_bindings = std::move(queued_bindings_);
175   queued_bindings_.clear();
176   for (base::WeakPtr<ServiceProxy>& service_proxy : queued_bindings) {
177     if (connected) {
178       BindService(service_proxy);
179     } else if (service_proxy) {
180       service_proxy->OnConnect(false /* success */);
181     }
182   }
183   // Don't access |this| below here.
184 }
185 
OnDisconnect(base::UnixSocket *)186 void ClientImpl::OnDisconnect(base::UnixSocket*) {
187   for (const auto& it : service_bindings_) {
188     base::WeakPtr<ServiceProxy> service_proxy = it.second;
189     task_runner_->PostTask([service_proxy] {
190       if (service_proxy)
191         service_proxy->OnDisconnect();
192     });
193   }
194   service_bindings_.clear();
195   queued_bindings_.clear();
196 }
197 
OnDataAvailable(base::UnixSocket *)198 void ClientImpl::OnDataAvailable(base::UnixSocket*) {
199   size_t rsize;
200   do {
201     auto buf = frame_deserializer_.BeginReceive();
202     base::ScopedFile fd;
203     rsize = sock_->Receive(buf.data, buf.size, &fd);
204 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
205     PERFETTO_DCHECK(!fd);
206 #else
207     if (fd) {
208       PERFETTO_DCHECK(!received_fd_);
209       int res = fcntl(*fd, F_SETFD, FD_CLOEXEC);
210       PERFETTO_DCHECK(res == 0);
211       received_fd_ = std::move(fd);
212     }
213 #endif
214     if (!frame_deserializer_.EndReceive(rsize)) {
215       // The endpoint tried to send a frame that is way too large.
216       return sock_->Shutdown(true);  // In turn will trigger an OnDisconnect().
217       // TODO(fmayer): check this.
218     }
219   } while (rsize > 0);
220 
221   while (std::unique_ptr<Frame> frame = frame_deserializer_.PopNextFrame())
222     OnFrameReceived(*frame);
223 }
224 
OnFrameReceived(const Frame & frame)225 void ClientImpl::OnFrameReceived(const Frame& frame) {
226   auto queued_requests_it = queued_requests_.find(frame.request_id());
227   if (queued_requests_it == queued_requests_.end()) {
228     PERFETTO_DLOG("OnFrameReceived(): got invalid request_id=%" PRIu64,
229                   static_cast<uint64_t>(frame.request_id()));
230     return;
231   }
232   QueuedRequest req = std::move(queued_requests_it->second);
233   queued_requests_.erase(queued_requests_it);
234 
235   if (req.type == Frame::kMsgBindServiceFieldNumber &&
236       frame.has_msg_bind_service_reply()) {
237     return OnBindServiceReply(std::move(req), frame.msg_bind_service_reply());
238   }
239   if (req.type == Frame::kMsgInvokeMethodFieldNumber &&
240       frame.has_msg_invoke_method_reply()) {
241     return OnInvokeMethodReply(std::move(req), frame.msg_invoke_method_reply());
242   }
243   if (frame.has_msg_request_error()) {
244     PERFETTO_DLOG("Host error: %s", frame.msg_request_error().error().c_str());
245     return;
246   }
247 
248   PERFETTO_DLOG(
249       "OnFrameReceived() request type=%d, received unknown frame in reply to "
250       "request_id=%" PRIu64,
251       req.type, static_cast<uint64_t>(frame.request_id()));
252 }
253 
OnBindServiceReply(QueuedRequest req,const Frame::BindServiceReply & reply)254 void ClientImpl::OnBindServiceReply(QueuedRequest req,
255                                     const Frame::BindServiceReply& reply) {
256   base::WeakPtr<ServiceProxy>& service_proxy = req.service_proxy;
257   if (!service_proxy)
258     return;
259   const char* svc_name = service_proxy->GetDescriptor().service_name;
260   if (!reply.success()) {
261     PERFETTO_DLOG("BindService(): unknown service_name=\"%s\"", svc_name);
262     return service_proxy->OnConnect(false /* success */);
263   }
264 
265   auto prev_service = service_bindings_.find(reply.service_id());
266   if (prev_service != service_bindings_.end() && prev_service->second.get()) {
267     PERFETTO_DLOG(
268         "BindService(): Trying to bind service \"%s\" but another service "
269         "named \"%s\" is already bound with the same ID.",
270         svc_name, prev_service->second->GetDescriptor().service_name);
271     return service_proxy->OnConnect(false /* success */);
272   }
273 
274   // Build the method [name] -> [remote_id] map.
275   std::map<std::string, MethodID> methods;
276   for (const auto& method : reply.methods()) {
277     if (method.name().empty() || method.id() <= 0) {
278       PERFETTO_DLOG("OnBindServiceReply(): invalid method \"%s\" -> %" PRIu64,
279                     method.name().c_str(), static_cast<uint64_t>(method.id()));
280       continue;
281     }
282     methods[method.name()] = method.id();
283   }
284   service_proxy->InitializeBinding(weak_ptr_factory_.GetWeakPtr(),
285                                    reply.service_id(), std::move(methods));
286   service_bindings_[reply.service_id()] = service_proxy;
287   service_proxy->OnConnect(true /* success */);
288 }
289 
OnInvokeMethodReply(QueuedRequest req,const Frame::InvokeMethodReply & reply)290 void ClientImpl::OnInvokeMethodReply(QueuedRequest req,
291                                      const Frame::InvokeMethodReply& reply) {
292   base::WeakPtr<ServiceProxy> service_proxy = req.service_proxy;
293   if (!service_proxy)
294     return;
295   std::unique_ptr<ProtoMessage> decoded_reply;
296   if (reply.success()) {
297     // If this becomes a hotspot, optimize by maintaining a dedicated hashtable.
298     for (const auto& method : service_proxy->GetDescriptor().methods) {
299       if (req.method_name == method.name) {
300         decoded_reply = method.reply_proto_decoder(reply.reply_proto());
301         break;
302       }
303     }
304   }
305   const RequestID request_id = req.request_id;
306   invoking_method_reply_ = true;
307   service_proxy->EndInvoke(request_id, std::move(decoded_reply),
308                            reply.has_more());
309   invoking_method_reply_ = false;
310 
311   // If this is a streaming method and future replies will be resolved, put back
312   // the |req| with the callback into the set of active requests.
313   if (reply.has_more())
314     queued_requests_.emplace(request_id, std::move(req));
315 }
316 
317 ClientImpl::QueuedRequest::QueuedRequest() = default;
318 
TakeReceivedFD()319 base::ScopedFile ClientImpl::TakeReceivedFD() {
320   return std::move(received_fd_);
321 }
322 
323 }  // namespace ipc
324 }  // namespace perfetto
325