1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #include "GrpcVehicleServer.h" 17 18 #include <condition_variable> 19 #include <mutex> 20 #include <shared_mutex> 21 22 #include <android-base/logging.h> 23 #include <grpc++/grpc++.h> 24 25 #include "GarageModeServerSideHandler.h" 26 #include "PowerStateListener.h" 27 #include "VehicleServer.grpc.pb.h" 28 #include "VehicleServer.pb.h" 29 #include "vhal_v2_0/DefaultConfig.h" 30 #include "vhal_v2_0/ProtoMessageConverter.h" 31 32 namespace android { 33 namespace hardware { 34 namespace automotive { 35 namespace vehicle { 36 namespace V2_0 { 37 38 namespace impl { 39 40 class GrpcVehicleServerImpl : public GrpcVehicleServer, public vhal_proto::VehicleServer::Service { 41 public: 42 explicit GrpcVehicleServerImpl(const VirtualizedVhalServerInfo& serverInfo) 43 : mServiceAddr(serverInfo.getServerUri()), 44 mGarageModeHandler(makeGarageModeServerSideHandler(this, &mValueObjectPool, 45 serverInfo.powerStateMarkerFilePath)), 46 mPowerStateListener(serverInfo.powerStateSocket, serverInfo.powerStateMarkerFilePath) { 47 setValuePool(&mValueObjectPool); 48 } 49 50 // method from GrpcVehicleServer 51 GrpcVehicleServer& Start() override; 52 53 void Wait() override; 54 55 GrpcVehicleServer& Stop() override; 56 57 uint32_t NumOfActivePropertyValueStream() override; 58 59 // methods from IVehicleServer 60 void onPropertyValueFromCar(const VehiclePropValue& value, bool updateStatus) override; 61 62 StatusCode onSetProperty(const VehiclePropValue& value, bool updateStatus) override; 63 64 // methods from vhal_proto::VehicleServer::Service 65 66 ::grpc::Status GetAllPropertyConfig( 67 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, 68 ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) override; 69 70 ::grpc::Status SetProperty(::grpc::ServerContext* context, 71 const vhal_proto::WrappedVehiclePropValue* wrappedPropValue, 72 vhal_proto::VehicleHalCallStatus* status) override; 73 74 ::grpc::Status SendAllPropertyValuesToStream(::grpc::ServerContext* context, 75 const ::google::protobuf::Empty*, 76 ::google::protobuf::Empty*) override; 77 78 ::grpc::Status StartPropertyValuesStream( 79 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, 80 ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) override; 81 82 private: 83 // We keep long-lasting connection for streaming the prop values. 84 // For us, each connection can be represented as a function to send the new value, and 85 // an ID to identify this connection 86 struct ConnectionDescriptor { 87 using ValueWriterType = std::function<bool(const vhal_proto::WrappedVehiclePropValue&)>; 88 89 explicit ConnectionDescriptor(ValueWriterType&& value_writer) 90 : mValueWriter(std::move(value_writer)), 91 mConnectionID(CONNECTION_ID_COUNTER.fetch_add(1)) {} 92 93 ConnectionDescriptor(const ConnectionDescriptor&) = delete; 94 95 ConnectionDescriptor& operator=(const ConnectionDescriptor&) = delete; 96 97 // This move constructor is NOT THREAD-SAFE, which means it cannot be moved 98 // while using. Since the connection descriptors are pretected by mConnectionMutex 99 // then we are fine here 100 ConnectionDescriptor(ConnectionDescriptor&& cd) 101 : mValueWriter(std::move(cd.mValueWriter)), 102 mConnectionID(cd.mConnectionID), 103 mIsAlive(cd.mIsAlive.load()) { 104 cd.mIsAlive.store(false); 105 } 106 107 ValueWriterType mValueWriter; 108 uint64_t mConnectionID; 109 std::atomic<bool> mIsAlive{true}; 110 111 static std::atomic<uint64_t> CONNECTION_ID_COUNTER; 112 }; 113 114 std::string mServiceAddr; 115 std::unique_ptr<::grpc::Server> mServer{nullptr}; 116 VehiclePropValuePool mValueObjectPool; 117 std::unique_ptr<GarageModeServerSideHandler> mGarageModeHandler; 118 PowerStateListener mPowerStateListener; 119 std::thread mPowerStateListenerThread{}; 120 mutable std::shared_mutex mConnectionMutex; 121 mutable std::shared_mutex mWriterMutex; 122 std::list<ConnectionDescriptor> mValueStreamingConnections; 123 }; 124 125 std::atomic<uint64_t> GrpcVehicleServerImpl::ConnectionDescriptor::CONNECTION_ID_COUNTER = 0; 126 127 static std::shared_ptr<::grpc::ServerCredentials> getServerCredentials() { 128 // TODO(chenhaosjtuacm): get secured credentials here 129 return ::grpc::InsecureServerCredentials(); 130 } 131 132 GrpcVehicleServerPtr makeGrpcVehicleServer(const VirtualizedVhalServerInfo& serverInfo) { 133 return std::make_unique<GrpcVehicleServerImpl>(serverInfo); 134 } 135 136 GrpcVehicleServer& GrpcVehicleServerImpl::Start() { 137 if (mServer) { 138 LOG(WARNING) << __func__ << ": GrpcVehicleServer has already started."; 139 return *this; 140 } 141 142 ::grpc::ServerBuilder builder; 143 builder.RegisterService(this); 144 builder.AddListeningPort(mServiceAddr, getServerCredentials()); 145 mServer = builder.BuildAndStart(); 146 147 CHECK(mServer) << __func__ << ": failed to create the GRPC server, " 148 << "please make sure the configuration and permissions are correct"; 149 150 mPowerStateListenerThread = std::thread([this]() { mPowerStateListener.Listen(); }); 151 return *this; 152 } 153 154 void GrpcVehicleServerImpl::Wait() { 155 if (mServer) { 156 mServer->Wait(); 157 } 158 159 if (mPowerStateListenerThread.joinable()) { 160 mPowerStateListenerThread.join(); 161 } 162 163 mPowerStateListenerThread = {}; 164 mServer.reset(); 165 } 166 167 GrpcVehicleServer& GrpcVehicleServerImpl::Stop() { 168 if (!mServer) { 169 LOG(WARNING) << __func__ << ": GrpcVehicleServer has not started."; 170 return *this; 171 } 172 173 mServer->Shutdown(); 174 mPowerStateListener.Stop(); 175 return *this; 176 } 177 178 uint32_t GrpcVehicleServerImpl::NumOfActivePropertyValueStream() { 179 std::shared_lock read_lock(mConnectionMutex); 180 return mValueStreamingConnections.size(); 181 } 182 183 void GrpcVehicleServerImpl::onPropertyValueFromCar(const VehiclePropValue& value, 184 bool updateStatus) { 185 vhal_proto::WrappedVehiclePropValue wrappedPropValue; 186 proto_msg_converter::toProto(wrappedPropValue.mutable_value(), value); 187 wrappedPropValue.set_update_status(updateStatus); 188 std::shared_lock read_lock(mConnectionMutex); 189 190 bool has_terminated_connections = 0; 191 192 for (auto& connection : mValueStreamingConnections) { 193 auto writeOK = connection.mValueWriter(wrappedPropValue); 194 if (!writeOK) { 195 LOG(ERROR) << __func__ << ": Server Write failed, connection lost. ID: " 196 << connection.mConnectionID; 197 has_terminated_connections = true; 198 connection.mIsAlive.store(false); 199 } 200 } 201 202 if (!has_terminated_connections) { 203 return; 204 } 205 206 read_lock.unlock(); 207 208 std::unique_lock write_lock(mConnectionMutex); 209 210 for (auto itr = mValueStreamingConnections.begin(); itr != mValueStreamingConnections.end();) { 211 if (!itr->mIsAlive.load()) { 212 itr = mValueStreamingConnections.erase(itr); 213 } else { 214 ++itr; 215 } 216 } 217 } 218 219 StatusCode GrpcVehicleServerImpl::onSetProperty(const VehiclePropValue& value, bool updateStatus) { 220 if (value.prop == AP_POWER_STATE_REPORT && 221 value.value.int32Values[0] == toInt(VehicleApPowerStateReport::SHUTDOWN_POSTPONE)) { 222 mGarageModeHandler->HandleHeartbeat(); 223 } 224 return GrpcVehicleServer::onSetProperty(value, updateStatus); 225 } 226 227 ::grpc::Status GrpcVehicleServerImpl::GetAllPropertyConfig( 228 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, 229 ::grpc::ServerWriter<vhal_proto::VehiclePropConfig>* stream) { 230 auto configs = onGetAllPropertyConfig(); 231 for (auto& config : configs) { 232 vhal_proto::VehiclePropConfig protoConfig; 233 proto_msg_converter::toProto(&protoConfig, config); 234 if (!stream->Write(protoConfig)) { 235 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); 236 } 237 } 238 239 return ::grpc::Status::OK; 240 } 241 242 ::grpc::Status GrpcVehicleServerImpl::SetProperty( 243 ::grpc::ServerContext* context, const vhal_proto::WrappedVehiclePropValue* wrappedPropValue, 244 vhal_proto::VehicleHalCallStatus* status) { 245 VehiclePropValue value; 246 proto_msg_converter::fromProto(&value, wrappedPropValue->value()); 247 248 auto set_status = static_cast<int32_t>(onSetProperty(value, wrappedPropValue->update_status())); 249 if (!vhal_proto::VehicleHalStatusCode_IsValid(set_status)) { 250 return ::grpc::Status(::grpc::StatusCode::INTERNAL, "Unknown status code"); 251 } 252 253 status->set_status_code(static_cast<vhal_proto::VehicleHalStatusCode>(set_status)); 254 255 return ::grpc::Status::OK; 256 } 257 258 ::grpc::Status GrpcVehicleServerImpl::SendAllPropertyValuesToStream( 259 ::grpc::ServerContext* context, const ::google::protobuf::Empty*, 260 ::google::protobuf::Empty*) { 261 sendAllValuesToClient(); 262 return ::grpc::Status::OK; 263 } 264 265 ::grpc::Status GrpcVehicleServerImpl::StartPropertyValuesStream( 266 ::grpc::ServerContext* context, const ::google::protobuf::Empty* request, 267 ::grpc::ServerWriter<vhal_proto::WrappedVehiclePropValue>* stream) { 268 std::mutex terminateMutex; 269 std::condition_variable terminateCV; 270 std::unique_lock<std::mutex> terminateLock(terminateMutex); 271 bool terminated{false}; 272 273 auto callBack = [stream, &terminateMutex, &terminateCV, &terminated, 274 this](const vhal_proto::WrappedVehiclePropValue& value) { 275 std::unique_lock lock(mWriterMutex); 276 if (!stream->Write(value)) { 277 std::unique_lock<std::mutex> terminateLock(terminateMutex); 278 terminated = true; 279 terminateLock.unlock(); 280 terminateCV.notify_all(); 281 return false; 282 } 283 return true; 284 }; 285 286 // Register connection 287 std::unique_lock lock(mConnectionMutex); 288 auto& conn = mValueStreamingConnections.emplace_back(std::move(callBack)); 289 lock.unlock(); 290 291 // Never stop until connection lost 292 terminateCV.wait(terminateLock, [&terminated]() { return terminated; }); 293 294 LOG(ERROR) << __func__ << ": Stream lost, ID : " << conn.mConnectionID; 295 296 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost."); 297 } 298 299 } // namespace impl 300 301 } // namespace V2_0 302 } // namespace vehicle 303 } // namespace automotive 304 } // namespace hardware 305 } // namespace android 306