1 /*
2  * Copyright 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef CPP_COMPUTEPIPE_TESTS_RUNNER_GRAPH_INCLUDES_GRPCGRAPHSERVERIMPL_H_
18 #define CPP_COMPUTEPIPE_TESTS_RUNNER_GRAPH_INCLUDES_GRPCGRAPHSERVERIMPL_H_
19 
20 #include <map>
21 #include <memory>
22 #include <string>
23 #include <thread>
24 
25 #include <android-base/logging.h>
26 #include <grpc++/grpc++.h>
27 
28 #include "GrpcPrebuiltGraphService.grpc.pb.h"
29 #include "GrpcPrebuiltGraphService.pb.h"
30 #include "Options.pb.h"
31 #include "PrebuiltEngineInterface.h"
32 #include "PrebuiltGraph.h"
33 #include "RunnerComponent.h"
34 #include "gmock/gmock-matchers.h"
35 #include "gmock/gmock.h"
36 #include "gtest/gtest.h"
37 #include "types/Status.h"
38 
39 namespace android {
40 namespace automotive {
41 namespace computepipe {
42 namespace graph {
43 
44 constexpr char kGraphName[] = "Stub graph name";
45 constexpr char kSetGraphConfigMessage[] = "Stub set config message";
46 constexpr char kSetDebugOptionMessage[] = "Stub set debug option message";
47 constexpr char kStartGraphMessage[] = "Stub start graph message";
48 constexpr char kStopGraphMessage[] = "Stub stop graph message";
49 constexpr char kOutputStreamPacket[] = "Stub output stream packet";
50 constexpr char kResetGraphMessage[] = "ResetGraphMessage";
51 
52 // This is a barebones synchronous server implementation. A better implementation would be an
53 // asynchronous implementation and it is upto the graph provider to do that. This implementation
54 // is very specific to tests being conducted here.
55 class GrpcGraphServerImpl : public proto::GrpcGraphService::Service {
56 private:
57     std::string mServerAddress;
58     std::unique_ptr<::grpc::Server> mServer;
59     std::mutex mLock;
60     std::condition_variable mShutdownCv;
61     bool mShutdown = false;
62 
63 public:
GrpcGraphServerImpl(std::string address)64     explicit GrpcGraphServerImpl(std::string address) : mServerAddress(address) {}
65 
~GrpcGraphServerImpl()66     virtual ~GrpcGraphServerImpl() {
67         if (mServer) {
68             mServer->Shutdown();
69             std::unique_lock lock(mLock);
70             if (!mShutdown) {
71                 mShutdownCv.wait_for(lock, std::chrono::seconds(10),
72                                      [this]() { return mShutdown; });
73             }
74         }
75     }
76 
startServer()77     void startServer() {
78         if (mServer == nullptr) {
79             ::grpc::ServerBuilder builder;
80             builder.RegisterService(this);
81             builder.AddListeningPort(mServerAddress, ::grpc::InsecureServerCredentials());
82             mServer = builder.BuildAndStart();
83             mServer->Wait();
84             std::lock_guard lock(mLock);
85             mShutdown = true;
86             mShutdownCv.notify_one();
87         }
88     }
89 
GetGraphOptions(::grpc::ServerContext * context,const proto::GraphOptionsRequest * request,proto::GraphOptionsResponse * response)90     ::grpc::Status GetGraphOptions(::grpc::ServerContext* context,
91                                    const proto::GraphOptionsRequest* request,
92                                    proto::GraphOptionsResponse* response) override {
93         proto::Options options;
94         options.set_graph_name(kGraphName);
95         response->set_serialized_options(options.SerializeAsString());
96         return ::grpc::Status::OK;
97     }
98 
SetGraphConfig(::grpc::ServerContext * context,const proto::SetGraphConfigRequest * request,proto::StatusResponse * response)99     ::grpc::Status SetGraphConfig(::grpc::ServerContext* context,
100                                   const proto::SetGraphConfigRequest* request,
101                                   proto::StatusResponse* response) override {
102         response->set_code(proto::RemoteGraphStatusCode::SUCCESS);
103         response->set_message(kSetGraphConfigMessage);
104         return ::grpc::Status::OK;
105     }
106 
SetDebugOption(::grpc::ServerContext * context,const proto::SetDebugRequest * request,proto::StatusResponse * response)107     ::grpc::Status SetDebugOption(::grpc::ServerContext* context,
108                                   const proto::SetDebugRequest* request,
109                                   proto::StatusResponse* response) override {
110         response->set_code(proto::RemoteGraphStatusCode::SUCCESS);
111         response->set_message(kSetDebugOptionMessage);
112         return ::grpc::Status::OK;
113     }
114 
StartGraphExecution(::grpc::ServerContext * context,const proto::StartGraphExecutionRequest * request,proto::StatusResponse * response)115     ::grpc::Status StartGraphExecution(::grpc::ServerContext* context,
116                                        const proto::StartGraphExecutionRequest* request,
117                                        proto::StatusResponse* response) override {
118         response->set_code(proto::RemoteGraphStatusCode::SUCCESS);
119         response->set_message(kStartGraphMessage);
120         return ::grpc::Status::OK;
121     }
122 
ObserveOutputStream(::grpc::ServerContext * context,const proto::ObserveOutputStreamRequest * request,::grpc::ServerWriter<proto::OutputStreamResponse> * writer)123     ::grpc::Status ObserveOutputStream(
124             ::grpc::ServerContext* context, const proto::ObserveOutputStreamRequest* request,
125             ::grpc::ServerWriter<proto::OutputStreamResponse>* writer) override {
126         // Write as many output packets as stream id. This is just to test different number of
127         // packets received with each stream. Also write even numbered stream as a pixel packet
128         // and odd numbered stream as a data packet.
129         for (int i = 0; i < request->stream_id(); i++) {
130             proto::OutputStreamResponse response;
131             if (request->stream_id() % 2 == 0) {
132                 response.mutable_pixel_data()->set_data(kOutputStreamPacket);
133                 response.mutable_pixel_data()->set_height(1);
134                 response.mutable_pixel_data()->set_width(sizeof(kOutputStreamPacket));
135                 response.mutable_pixel_data()->set_step(sizeof(kOutputStreamPacket));
136                 response.mutable_pixel_data()->set_format(proto::PixelFormat::GRAY);
137                 EXPECT_TRUE(response.has_pixel_data());
138             } else {
139                 response.set_semantic_data(kOutputStreamPacket);
140                 EXPECT_TRUE(response.has_semantic_data());
141             }
142             if (!writer->Write(response)) {
143                 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost");
144             }
145         }
146 
147         return ::grpc::Status::OK;
148     }
149 
StopGraphExecution(::grpc::ServerContext * context,const proto::StopGraphExecutionRequest * request,proto::StatusResponse * response)150     ::grpc::Status StopGraphExecution(::grpc::ServerContext* context,
151                                       const proto::StopGraphExecutionRequest* request,
152                                       proto::StatusResponse* response) override {
153         response->set_code(proto::RemoteGraphStatusCode::SUCCESS);
154         response->set_message(kStopGraphMessage);
155         return ::grpc::Status::OK;
156     }
157 
ResetGraph(::grpc::ServerContext * context,const proto::ResetGraphRequest * request,proto::StatusResponse * response)158     ::grpc::Status ResetGraph(::grpc::ServerContext* context,
159                               const proto::ResetGraphRequest* request,
160                               proto::StatusResponse* response) override {
161         response->set_code(proto::RemoteGraphStatusCode::SUCCESS);
162         response->set_message(kResetGraphMessage);
163         return ::grpc::Status::OK;
164     }
165 
GetProfilingData(::grpc::ServerContext * context,const proto::ProfilingDataRequest * request,proto::ProfilingDataResponse * response)166     ::grpc::Status GetProfilingData(::grpc::ServerContext* context,
167                                     const proto::ProfilingDataRequest* request,
168                                     proto::ProfilingDataResponse* response) {
169         response->set_data(kSetGraphConfigMessage);
170         return ::grpc::Status::OK;
171     }
172 };
173 
174 class PrebuiltEngineInterfaceImpl : public PrebuiltEngineInterface {
175 private:
176     std::map<int, int> mNumPacketsPerStream;
177     std::mutex mLock;
178     std::condition_variable mCv;
179     bool mGraphTerminated = false;
180 
181 public:
182     // Prebuilt to engine interface
DispatchPixelData(int streamId,int64_t timestamp,const runner::InputFrame & frame)183     void DispatchPixelData(int streamId, int64_t timestamp,
184                            const runner::InputFrame& frame) override {
185         ASSERT_EQ(streamId % 2, 0);
186         std::lock_guard lock(mLock);
187         if (mNumPacketsPerStream.find(streamId) == mNumPacketsPerStream.end()) {
188             mNumPacketsPerStream[streamId] = 1;
189         } else {
190             mNumPacketsPerStream[streamId]++;
191         }
192     }
193 
DispatchSerializedData(int streamId,int64_t timestamp,std::string && data)194     void DispatchSerializedData(int streamId, int64_t timestamp, std::string&& data) override {
195         ASSERT_EQ(streamId % 2, 1);
196         std::lock_guard lock(mLock);
197         if (mNumPacketsPerStream.find(streamId) == mNumPacketsPerStream.end()) {
198             mNumPacketsPerStream[streamId] = 1;
199         } else {
200             mNumPacketsPerStream[streamId]++;
201         }
202     }
203 
DispatchGraphTerminationMessage(Status status,std::string && msg)204     void DispatchGraphTerminationMessage(Status status, std::string&& msg) override {
205         std::lock_guard lock(mLock);
206         mGraphTerminated = true;
207         mCv.notify_one();
208     }
209 
waitForTermination()210     bool waitForTermination() {
211         std::unique_lock lock(mLock);
212         if (!mGraphTerminated) {
213             mCv.wait_for(lock, std::chrono::seconds(10), [this] { return mGraphTerminated; });
214         }
215         return mGraphTerminated;
216     }
217 
numPacketsForStream(int streamId)218     int numPacketsForStream(int streamId) {
219         std::lock_guard lock(mLock);
220         auto it = mNumPacketsPerStream.find(streamId);
221         if (it == mNumPacketsPerStream.end()) {
222             return 0;
223         }
224         return it->second;
225     }
226 };
227 
228 }  // namespace graph
229 }  // namespace computepipe
230 }  // namespace automotive
231 }  // namespace android
232 
233 #endif  // CPP_COMPUTEPIPE_TESTS_RUNNER_GRAPH_INCLUDES_GRPCGRAPHSERVERIMPL_H_
234