1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #include "GrpcVehicleClient.h" 17 18 #include <condition_variable> 19 #include <mutex> 20 #include <thread> 21 22 #include <android-base/logging.h> 23 #include <grpc++/grpc++.h> 24 25 #include "VehicleServer.grpc.pb.h" 26 #include "VehicleServer.pb.h" 27 #include "vhal_v2_0/DefaultConfig.h" 28 #include "vhal_v2_0/ProtoMessageConverter.h" 29 30 namespace android { 31 namespace hardware { 32 namespace automotive { 33 namespace vehicle { 34 namespace V2_0 { 35 36 namespace impl { 37 38 static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() { 39 // TODO(chenhaosjtuacm): get secured credentials here 40 return ::grpc::InsecureChannelCredentials(); 41 } 42 43 class GrpcVehicleClientImpl : public VehicleHalClient { 44 public: 45 explicit GrpcVehicleClientImpl(const std::string& addr) 46 : mServiceAddr(addr), 47 mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())), 48 mGrpcStub(vhal_proto::VehicleServer::NewStub(mGrpcChannel)) { 49 StartValuePollingThread(); 50 } 51 52 ~GrpcVehicleClientImpl() { 53 mShuttingDownFlag.store(true); 54 mShutdownCV.notify_all(); 55 56 if (mPollingThread.joinable()) { 57 mPollingThread.join(); 58 } 59 } 60 61 // methods from IVehicleClient 62 63 std::vector<VehiclePropConfig> getAllPropertyConfig() const override; 64 65 StatusCode setProperty(const VehiclePropValue& value, bool updateStatus) override; 66 67 // methods from VehicleHalClient 68 69 void triggerSendAllValues() override; 70 71 private: 72 void StartValuePollingThread(); 73 74 // private data members 75 76 std::string mServiceAddr; 77 std::shared_ptr<::grpc::Channel> mGrpcChannel; 78 std::unique_ptr<vhal_proto::VehicleServer::Stub> mGrpcStub; 79 std::thread mPollingThread; 80 81 std::mutex mShutdownMutex; 82 std::condition_variable mShutdownCV; 83 std::atomic<bool> mShuttingDownFlag{false}; 84 }; 85 86 std::unique_ptr<VehicleHalClient> makeGrpcVehicleClient(const std::string& addr) { 87 return std::make_unique<GrpcVehicleClientImpl>(addr); 88 } 89 90 std::vector<VehiclePropConfig> GrpcVehicleClientImpl::getAllPropertyConfig() const { 91 std::vector<VehiclePropConfig> configs; 92 ::grpc::ClientContext context; 93 auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty()); 94 vhal_proto::VehiclePropConfig protoConfig; 95 while (config_stream->Read(&protoConfig)) { 96 VehiclePropConfig config; 97 proto_msg_converter::fromProto(&config, protoConfig); 98 configs.emplace_back(std::move(config)); 99 } 100 auto grpc_status = config_stream->Finish(); 101 if (!grpc_status.ok()) { 102 LOG(ERROR) << __func__ 103 << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message(); 104 configs.clear(); 105 } 106 107 return configs; 108 } 109 110 StatusCode GrpcVehicleClientImpl::setProperty(const VehiclePropValue& value, bool updateStatus) { 111 ::grpc::ClientContext context; 112 vhal_proto::WrappedVehiclePropValue wrappedProtoValue; 113 vhal_proto::VehicleHalCallStatus vhal_status; 114 proto_msg_converter::toProto(wrappedProtoValue.mutable_value(), value); 115 wrappedProtoValue.set_update_status(updateStatus); 116 117 auto grpc_status = mGrpcStub->SetProperty(&context, wrappedProtoValue, &vhal_status); 118 if (!grpc_status.ok()) { 119 LOG(ERROR) << __func__ << ": GRPC SetProperty Failed: " << grpc_status.error_message(); 120 return StatusCode::INTERNAL_ERROR; 121 } 122 123 return static_cast<StatusCode>(vhal_status.status_code()); 124 } 125 126 void GrpcVehicleClientImpl::triggerSendAllValues() { 127 ::grpc::ClientContext context; 128 ::google::protobuf::Empty empty_response; 129 130 auto grpc_status = mGrpcStub->SendAllPropertyValuesToStream( 131 &context, ::google::protobuf::Empty(), &empty_response); 132 if (!grpc_status.ok()) { 133 LOG(ERROR) << __func__ << ": GRPC SendAllPropertyValuesToStream Failed: " 134 << grpc_status.error_message(); 135 } 136 } 137 138 void GrpcVehicleClientImpl::StartValuePollingThread() { 139 mPollingThread = std::thread([this]() { 140 while (!mShuttingDownFlag.load()) { 141 ::grpc::ClientContext context; 142 143 std::atomic<bool> rpc_ok{true}; 144 std::thread shuttingdown_watcher([this, &rpc_ok, &context]() { 145 std::unique_lock<std::mutex> shutdownLock(mShutdownMutex); 146 mShutdownCV.wait(shutdownLock, [this, &rpc_ok]() { 147 return !rpc_ok.load() || mShuttingDownFlag.load(); 148 }); 149 context.TryCancel(); 150 }); 151 152 auto value_stream = 153 mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty()); 154 LOG(INFO) << __func__ << ": GRPC Value Streaming Started"; 155 vhal_proto::WrappedVehiclePropValue wrappedProtoValue; 156 while (!mShuttingDownFlag.load() && value_stream->Read(&wrappedProtoValue)) { 157 VehiclePropValue value; 158 proto_msg_converter::fromProto(&value, wrappedProtoValue.value()); 159 onPropertyValue(value, wrappedProtoValue.update_status()); 160 } 161 162 rpc_ok.store(false); 163 mShutdownCV.notify_all(); 164 shuttingdown_watcher.join(); 165 166 auto grpc_status = value_stream->Finish(); 167 // never reach here until connection lost 168 LOG(ERROR) << __func__ 169 << ": GRPC Value Streaming Failed: " << grpc_status.error_message(); 170 171 // try to reconnect 172 } 173 }); 174 } 175 176 } // namespace impl 177 178 } // namespace V2_0 179 } // namespace vehicle 180 } // namespace automotive 181 } // namespace hardware 182 } // namespace android 183