1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "GrpcVehicleServer.h"
17 
18 #include <condition_variable>
19 #include <mutex>
20 #include <shared_mutex>
21 
22 #include <android-base/logging.h>
23 #include <grpc++/grpc++.h>
24 
25 #include "GarageModeServerSideHandler.h"
26 #include "PowerStateListener.h"
27 #include "VehicleServer.grpc.pb.h"
28 #include "VehicleServer.pb.h"
29 #include "vhal_v2_0/DefaultConfig.h"
30 #include "vhal_v2_0/ProtoMessageConverter.h"
31 
32 namespace android {
33 namespace hardware {
34 namespace automotive {
35 namespace vehicle {
36 namespace V2_0 {
37 
38 namespace impl {
39 
40 class GrpcVehicleServerImpl : public GrpcVehicleServer, public vhal_proto::VehicleServer::Service {
41   public:
42     explicit GrpcVehicleServerImpl(const VirtualizedVhalServerInfo& serverInfo)
43         : mServiceAddr(serverInfo.getServerUri()),
44           mGarageModeHandler(makeGarageModeServerSideHandler(this, &mValueObjectPool,
45                                                              serverInfo.powerStateMarkerFilePath)),
46           mPowerStateListener(serverInfo.powerStateSocket, serverInfo.powerStateMarkerFilePath) {
47         setValuePool(&mValueObjectPool);
48     }
49 
50     // method from GrpcVehicleServer
51     GrpcVehicleServer& Start() override;
52 
53     void Wait() override;
54 
55     GrpcVehicleServer& Stop() override;
56 
57     uint32_t NumOfActivePropertyValueStream() override;
58 
59     // methods from IVehicleServer
60     void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override;
61 
62     StatusCode onSetProperty(const VehiclePropValue& value, bool updateStatus) override;
63 
64     // methods from vhal_proto::VehicleServer::Service
65 
66     ::grpc::Status GetAllPropertyConfig(
67             ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
68             ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) override;
69 
70     ::grpc::Status SetProperty(::grpc::ServerContext* context,
71                                const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
72                                vhal_proto::VehicleHalCallStatus* status) override;
73 
74     ::grpc::Status SendAllPropertyValuesToStream(::grpc::ServerContext* context,
75                                                  const ::google::protobuf::Empty*,
76                                                  ::google::protobuf::Empty*) override;
77 
78     ::grpc::Status StartPropertyValuesStream(
79             ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
80             ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) override;
81 
82   private:
83     // We keep long-lasting connection for streaming the prop values.
84     // For us, each connection can be represented as a function to send the new value, and
85     // an ID to identify this connection
86     struct ConnectionDescriptor {
87         using ValueWriterType = std::function<bool(const vhal_proto::WrappedVehiclePropValue&)>;
88 
89         explicit ConnectionDescriptor(ValueWriterType&& value_writer)
90             : mValueWriter(std::move(value_writer)),
91               mConnectionID(CONNECTION_ID_COUNTER.fetch_add(1)) {}
92 
93         ConnectionDescriptor(const ConnectionDescriptor&) = delete;
94 
95         ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete;
96 
97         // This move constructor is NOT THREAD-SAFE, which means it cannot be moved
98         // while using. Since the connection descriptors are pretected by mConnectionMutex
99         // then we are fine here
100         ConnectionDescriptor(ConnectionDescriptor&& cd)
101             : mValueWriter(std::move(cd.mValueWriter)),
102               mConnectionID(cd.mConnectionID),
103               mIsAlive(cd.mIsAlive.load()) {
104             cd.mIsAlive.store(false);
105         }
106 
107         ValueWriterType mValueWriter;
108         uint64_t mConnectionID;
109         std::atomic<bool> mIsAlive{true};
110 
111         static std::atomic<uint64_t> CONNECTION_ID_COUNTER;
112     };
113 
114     std::string mServiceAddr;
115     std::unique_ptr<::grpc::Server> mServer{nullptr};
116     VehiclePropValuePool mValueObjectPool;
117     std::unique_ptr<GarageModeServerSideHandler> mGarageModeHandler;
118     PowerStateListener mPowerStateListener;
119     std::thread mPowerStateListenerThread{};
120     mutable std::shared_mutex mConnectionMutex;
121     mutable std::shared_mutex mWriterMutex;
122     std::list<ConnectionDescriptor> mValueStreamingConnections;
123 };
124 
125 std::atomic<uint64_t> GrpcVehicleServerImpl::ConnectionDescriptor::CONNECTION_ID_COUNTER = 0;
126 
127 static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() {
128     // TODO(chenhaosjtuacm): get secured credentials here
129     return ::grpc::InsecureServerCredentials();
130 }
131 
132 GrpcVehicleServerPtr makeGrpcVehicleServer(const VirtualizedVhalServerInfo& serverInfo) {
133     return std::make_unique<GrpcVehicleServerImpl>(serverInfo);
134 }
135 
136 GrpcVehicleServer& GrpcVehicleServerImpl::Start() {
137     if (mServer) {
138         LOG(WARNING) << __func__ << ": GrpcVehicleServer has already started.";
139         return *this;
140     }
141 
142     ::grpc::ServerBuilder builder;
143     builder.RegisterService(this);
144     builder.AddListeningPort(mServiceAddr, getServerCredentials());
145     mServer = builder.BuildAndStart();
146 
147     CHECK(mServer) << __func__ << ": failed to create the GRPC server, "
148                    << "please make sure the configuration and permissions are correct";
149 
150     mPowerStateListenerThread = std::thread([this]() { mPowerStateListener.Listen(); });
151     return *this;
152 }
153 
154 void GrpcVehicleServerImpl::Wait() {
155     if (mServer) {
156         mServer->Wait();
157     }
158 
159     if (mPowerStateListenerThread.joinable()) {
160         mPowerStateListenerThread.join();
161     }
162 
163     mPowerStateListenerThread = {};
164     mServer.reset();
165 }
166 
167 GrpcVehicleServer& GrpcVehicleServerImpl::Stop() {
168     if (!mServer) {
169         LOG(WARNING) << __func__ << ": GrpcVehicleServer has not started.";
170         return *this;
171     }
172 
173     mServer->Shutdown();
174     mPowerStateListener.Stop();
175     return *this;
176 }
177 
178 uint32_t GrpcVehicleServerImpl::NumOfActivePropertyValueStream() {
179     std::shared_lock read_lock(mConnectionMutex);
180     return mValueStreamingConnections.size();
181 }
182 
183 void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value,
184                                                    bool updateStatus) {
185     vhal_proto::WrappedVehiclePropValue wrappedPropValue;
186     proto_msg_converter::toProto(wrappedPropValue.mutable_value(), value);
187     wrappedPropValue.set_update_status(updateStatus);
188     std::shared_lock read_lock(mConnectionMutex);
189 
190     bool has_terminated_connections = 0;
191 
192     for (auto& connection : mValueStreamingConnections) {
193         auto writeOK = connection.mValueWriter(wrappedPropValue);
194         if (!writeOK) {
195             LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: "
196                        << connection.mConnectionID;
197             has_terminated_connections = true;
198             connection.mIsAlive.store(false);
199         }
200     }
201 
202     if (!has_terminated_connections) {
203         return;
204     }
205 
206     read_lock.unlock();
207 
208     std::unique_lock write_lock(mConnectionMutex);
209 
210     for (auto itr = mValueStreamingConnections.begin(); itr != mValueStreamingConnections.end();) {
211         if (!itr->mIsAlive.load()) {
212             itr = mValueStreamingConnections.erase(itr);
213         } else {
214             ++itr;
215         }
216     }
217 }
218 
219 StatusCode GrpcVehicleServerImpl::onSetProperty(const VehiclePropValue& value, bool updateStatus) {
220     if (value.prop == AP_POWER_STATE_REPORT &&
221         value.value.int32Values[0] == toInt(VehicleApPowerStateReport::SHUTDOWN_POSTPONE)) {
222         mGarageModeHandler->HandleHeartbeat();
223     }
224     return GrpcVehicleServer::onSetProperty(value, updateStatus);
225 }
226 
227 ::grpc::Status GrpcVehicleServerImpl::GetAllPropertyConfig(
228         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
229         ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) {
230     auto configs = onGetAllPropertyConfig();
231     for (auto& config : configs) {
232         vhal_proto::VehiclePropConfig protoConfig;
233         proto_msg_converter::toProto(&protoConfig, config);
234         if (!stream->Write(protoConfig)) {
235             return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
236         }
237     }
238 
239     return ::grpc::Status::OK;
240 }
241 
242 ::grpc::Status GrpcVehicleServerImpl::SetProperty(
243         ::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue,
244         vhal_proto::VehicleHalCallStatus* status) {
245     VehiclePropValue value;
246     proto_msg_converter::fromProto(&value, wrappedPropValue->value());
247 
248     auto set_status = static_cast<int32_t>(onSetProperty(value, wrappedPropValue->update_status()));
249     if (!vhal_proto::VehicleHalStatusCode_IsValid(set_status)) {
250         return ::grpc::Status(::grpc::StatusCode::INTERNAL, "Unknown status code");
251     }
252 
253     status->set_status_code(static_cast<vhal_proto::VehicleHalStatusCode>(set_status));
254 
255     return ::grpc::Status::OK;
256 }
257 
258 ::grpc::Status GrpcVehicleServerImpl::SendAllPropertyValuesToStream(
259         ::grpc::ServerContext* context, const ::google::protobuf::Empty*,
260         ::google::protobuf::Empty*) {
261     sendAllValuesToClient();
262     return ::grpc::Status::OK;
263 }
264 
265 ::grpc::Status GrpcVehicleServerImpl::StartPropertyValuesStream(
266         ::grpc::ServerContext* context, const ::google::protobuf::Empty* request,
267         ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) {
268     std::mutex terminateMutex;
269     std::condition_variable terminateCV;
270     std::unique_lock<std::mutex> terminateLock(terminateMutex);
271     bool terminated{false};
272 
273     auto callBack = [stream, &terminateMutex, &terminateCV, &terminated,
274                      this](const vhal_proto::WrappedVehiclePropValue& value) {
275         std::unique_lock lock(mWriterMutex);
276         if (!stream->Write(value)) {
277             std::unique_lock<std::mutex> terminateLock(terminateMutex);
278             terminated = true;
279             terminateLock.unlock();
280             terminateCV.notify_all();
281             return false;
282         }
283         return true;
284     };
285 
286     // Register connection
287     std::unique_lock lock(mConnectionMutex);
288     auto& conn = mValueStreamingConnections.emplace_back(std::move(callBack));
289     lock.unlock();
290 
291     // Never stop until connection lost
292     terminateCV.wait(terminateLock, [&terminated]() { return terminated; });
293 
294     LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn.mConnectionID;
295 
296     return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost.");
297 }
298 
299 }  // namespace impl
300 
301 }  // namespace V2_0
302 }  // namespace vehicle
303 }  // namespace automotive
304 }  // namespace hardware
305 }  // namespace android
306