1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "GrpcVehicleClient.h"
17 
18 #include <condition_variable>
19 #include <mutex>
20 #include <thread>
21 
22 #include <android-base/logging.h>
23 #include <grpc++/grpc++.h>
24 
25 #include "VehicleServer.grpc.pb.h"
26 #include "VehicleServer.pb.h"
27 #include "vhal_v2_0/DefaultConfig.h"
28 #include "vhal_v2_0/ProtoMessageConverter.h"
29 
30 namespace android {
31 namespace hardware {
32 namespace automotive {
33 namespace vehicle {
34 namespace V2_0 {
35 
36 namespace impl {
37 
getChannelCredentials()38 static std::shared_ptr<::grpc::ChannelCredentials> getChannelCredentials() {
39     // TODO(chenhaosjtuacm): get secured credentials here
40     return ::grpc::InsecureChannelCredentials();
41 }
42 
43 class GrpcVehicleClientImpl : public VehicleHalClient {
44   public:
GrpcVehicleClientImpl(const std::string & addr)45     explicit GrpcVehicleClientImpl(const std::string& addr)
46         : mServiceAddr(addr),
47           mGrpcChannel(::grpc::CreateChannel(mServiceAddr, getChannelCredentials())),
48           mGrpcStub(vhal_proto::VehicleServer::NewStub(mGrpcChannel)) {
49         StartValuePollingThread();
50     }
51 
~GrpcVehicleClientImpl()52     ~GrpcVehicleClientImpl() {
53         mShuttingDownFlag.store(true);
54         mShutdownCV.notify_all();
55 
56         if (mPollingThread.joinable()) {
57             mPollingThread.join();
58         }
59     }
60 
61     // methods from IVehicleClient
62 
63     std::vector<VehiclePropConfig> getAllPropertyConfig() const override;
64 
65     StatusCode setProperty(const VehiclePropValue& value, bool updateStatus) override;
66 
67     // methods from VehicleHalClient
68 
69     void triggerSendAllValues() override;
70 
71   private:
72     void StartValuePollingThread();
73 
74     // private data members
75 
76     std::string mServiceAddr;
77     std::shared_ptr<::grpc::Channel> mGrpcChannel;
78     std::unique_ptr<vhal_proto::VehicleServer::Stub> mGrpcStub;
79     std::thread mPollingThread;
80 
81     std::mutex mShutdownMutex;
82     std::condition_variable mShutdownCV;
83     std::atomic<bool> mShuttingDownFlag{false};
84 };
85 
makeGrpcVehicleClient(const std::string & addr)86 std::unique_ptr<VehicleHalClient> makeGrpcVehicleClient(const std::string& addr) {
87     return std::make_unique<GrpcVehicleClientImpl>(addr);
88 }
89 
getAllPropertyConfig() const90 std::vector<VehiclePropConfig> GrpcVehicleClientImpl::getAllPropertyConfig() const {
91     std::vector<VehiclePropConfig> configs;
92     ::grpc::ClientContext context;
93     auto config_stream = mGrpcStub->GetAllPropertyConfig(&context, ::google::protobuf::Empty());
94     vhal_proto::VehiclePropConfig protoConfig;
95     while (config_stream->Read(&protoConfig)) {
96         VehiclePropConfig config;
97         proto_msg_converter::fromProto(&config, protoConfig);
98         configs.emplace_back(std::move(config));
99     }
100     auto grpc_status = config_stream->Finish();
101     if (!grpc_status.ok()) {
102         LOG(ERROR) << __func__
103                    << ": GRPC GetAllPropertyConfig Failed: " << grpc_status.error_message();
104         configs.clear();
105     }
106 
107     return configs;
108 }
109 
setProperty(const VehiclePropValue & value,bool updateStatus)110 StatusCode GrpcVehicleClientImpl::setProperty(const VehiclePropValue& value, bool updateStatus) {
111     ::grpc::ClientContext context;
112     vhal_proto::WrappedVehiclePropValue wrappedProtoValue;
113     vhal_proto::VehicleHalCallStatus vhal_status;
114     proto_msg_converter::toProto(wrappedProtoValue.mutable_value(), value);
115     wrappedProtoValue.set_update_status(updateStatus);
116 
117     auto grpc_status = mGrpcStub->SetProperty(&context, wrappedProtoValue, &vhal_status);
118     if (!grpc_status.ok()) {
119         LOG(ERROR) << __func__ << ": GRPC SetProperty Failed: " << grpc_status.error_message();
120         return StatusCode::INTERNAL_ERROR;
121     }
122 
123     return static_cast<StatusCode>(vhal_status.status_code());
124 }
125 
triggerSendAllValues()126 void GrpcVehicleClientImpl::triggerSendAllValues() {
127     ::grpc::ClientContext context;
128     ::google::protobuf::Empty empty_response;
129 
130     auto grpc_status = mGrpcStub->SendAllPropertyValuesToStream(
131             &context, ::google::protobuf::Empty(), &empty_response);
132     if (!grpc_status.ok()) {
133         LOG(ERROR) << __func__ << ": GRPC SendAllPropertyValuesToStream Failed: "
134                    << grpc_status.error_message();
135     }
136 }
137 
StartValuePollingThread()138 void GrpcVehicleClientImpl::StartValuePollingThread() {
139     mPollingThread = std::thread([this]() {
140         while (!mShuttingDownFlag.load()) {
141             ::grpc::ClientContext context;
142 
143             std::atomic<bool> rpc_ok{true};
144             std::thread shuttingdown_watcher([this, &rpc_ok, &context]() {
145                 std::unique_lock<std::mutex> shutdownLock(mShutdownMutex);
146                 mShutdownCV.wait(shutdownLock, [this, &rpc_ok]() {
147                     return !rpc_ok.load() || mShuttingDownFlag.load();
148                 });
149                 context.TryCancel();
150             });
151 
152             auto value_stream =
153                     mGrpcStub->StartPropertyValuesStream(&context, ::google::protobuf::Empty());
154             LOG(INFO) << __func__ << ": GRPC Value Streaming Started";
155             vhal_proto::WrappedVehiclePropValue wrappedProtoValue;
156             while (!mShuttingDownFlag.load() && value_stream->Read(&wrappedProtoValue)) {
157                 VehiclePropValue value;
158                 proto_msg_converter::fromProto(&value, wrappedProtoValue.value());
159                 onPropertyValue(value, wrappedProtoValue.update_status());
160             }
161 
162             rpc_ok.store(false);
163             mShutdownCV.notify_all();
164             shuttingdown_watcher.join();
165 
166             auto grpc_status = value_stream->Finish();
167             // never reach here until connection lost
168             LOG(ERROR) << __func__
169                        << ": GRPC Value Streaming Failed: " << grpc_status.error_message();
170 
171             // try to reconnect
172         }
173     });
174 }
175 
176 }  // namespace impl
177 
178 }  // namespace V2_0
179 }  // namespace vehicle
180 }  // namespace automotive
181 }  // namespace hardware
182 }  // namespace android
183