1 /* 2 * Copyright 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef CPP_COMPUTEPIPE_TESTS_RUNNER_GRAPH_INCLUDES_GRPCGRAPHSERVERIMPL_H_ 18 #define CPP_COMPUTEPIPE_TESTS_RUNNER_GRAPH_INCLUDES_GRPCGRAPHSERVERIMPL_H_ 19 20 #include <map> 21 #include <memory> 22 #include <string> 23 #include <thread> 24 25 #include <android-base/logging.h> 26 #include <grpc++/grpc++.h> 27 28 #include "GrpcPrebuiltGraphService.grpc.pb.h" 29 #include "GrpcPrebuiltGraphService.pb.h" 30 #include "Options.pb.h" 31 #include "PrebuiltEngineInterface.h" 32 #include "PrebuiltGraph.h" 33 #include "RunnerComponent.h" 34 #include "gmock/gmock-matchers.h" 35 #include "gmock/gmock.h" 36 #include "gtest/gtest.h" 37 #include "types/Status.h" 38 39 namespace android { 40 namespace automotive { 41 namespace computepipe { 42 namespace graph { 43 44 constexpr char kGraphName[] = "Stub graph name"; 45 constexpr char kSetGraphConfigMessage[] = "Stub set config message"; 46 constexpr char kSetDebugOptionMessage[] = "Stub set debug option message"; 47 constexpr char kStartGraphMessage[] = "Stub start graph message"; 48 constexpr char kStopGraphMessage[] = "Stub stop graph message"; 49 constexpr char kOutputStreamPacket[] = "Stub output stream packet"; 50 constexpr char kResetGraphMessage[] = "ResetGraphMessage"; 51 52 // This is a barebones synchronous server implementation. A better implementation would be an 53 // asynchronous implementation and it is upto the graph provider to do that. This implementation 54 // is very specific to tests being conducted here. 55 class GrpcGraphServerImpl : public proto::GrpcGraphService::Service { 56 private: 57 std::string mServerAddress; 58 std::unique_ptr<::grpc::Server> mServer; 59 std::mutex mLock; 60 std::condition_variable mShutdownCv; 61 bool mShutdown = false; 62 63 public: GrpcGraphServerImpl(std::string address)64 explicit GrpcGraphServerImpl(std::string address) : mServerAddress(address) {} 65 ~GrpcGraphServerImpl()66 virtual ~GrpcGraphServerImpl() { 67 if (mServer) { 68 mServer->Shutdown(); 69 std::unique_lock lock(mLock); 70 if (!mShutdown) { 71 mShutdownCv.wait_for(lock, std::chrono::seconds(10), 72 [this]() { return mShutdown; }); 73 } 74 } 75 } 76 startServer()77 void startServer() { 78 if (mServer == nullptr) { 79 ::grpc::ServerBuilder builder; 80 builder.RegisterService(this); 81 builder.AddListeningPort(mServerAddress, ::grpc::InsecureServerCredentials()); 82 mServer = builder.BuildAndStart(); 83 mServer->Wait(); 84 std::lock_guard lock(mLock); 85 mShutdown = true; 86 mShutdownCv.notify_one(); 87 } 88 } 89 GetGraphOptions(::grpc::ServerContext * context,const proto::GraphOptionsRequest * request,proto::GraphOptionsResponse * response)90 ::grpc::Status GetGraphOptions(::grpc::ServerContext* context, 91 const proto::GraphOptionsRequest* request, 92 proto::GraphOptionsResponse* response) override { 93 proto::Options options; 94 options.set_graph_name(kGraphName); 95 response->set_serialized_options(options.SerializeAsString()); 96 return ::grpc::Status::OK; 97 } 98 SetGraphConfig(::grpc::ServerContext * context,const proto::SetGraphConfigRequest * request,proto::StatusResponse * response)99 ::grpc::Status SetGraphConfig(::grpc::ServerContext* context, 100 const proto::SetGraphConfigRequest* request, 101 proto::StatusResponse* response) override { 102 response->set_code(proto::RemoteGraphStatusCode::SUCCESS); 103 response->set_message(kSetGraphConfigMessage); 104 return ::grpc::Status::OK; 105 } 106 SetDebugOption(::grpc::ServerContext * context,const proto::SetDebugRequest * request,proto::StatusResponse * response)107 ::grpc::Status SetDebugOption(::grpc::ServerContext* context, 108 const proto::SetDebugRequest* request, 109 proto::StatusResponse* response) override { 110 response->set_code(proto::RemoteGraphStatusCode::SUCCESS); 111 response->set_message(kSetDebugOptionMessage); 112 return ::grpc::Status::OK; 113 } 114 StartGraphExecution(::grpc::ServerContext * context,const proto::StartGraphExecutionRequest * request,proto::StatusResponse * response)115 ::grpc::Status StartGraphExecution(::grpc::ServerContext* context, 116 const proto::StartGraphExecutionRequest* request, 117 proto::StatusResponse* response) override { 118 response->set_code(proto::RemoteGraphStatusCode::SUCCESS); 119 response->set_message(kStartGraphMessage); 120 return ::grpc::Status::OK; 121 } 122 ObserveOutputStream(::grpc::ServerContext * context,const proto::ObserveOutputStreamRequest * request,::grpc::ServerWriter<proto::OutputStreamResponse> * writer)123 ::grpc::Status ObserveOutputStream( 124 ::grpc::ServerContext* context, const proto::ObserveOutputStreamRequest* request, 125 ::grpc::ServerWriter<proto::OutputStreamResponse>* writer) override { 126 // Write as many output packets as stream id. This is just to test different number of 127 // packets received with each stream. Also write even numbered stream as a pixel packet 128 // and odd numbered stream as a data packet. 129 for (int i = 0; i < request->stream_id(); i++) { 130 proto::OutputStreamResponse response; 131 if (request->stream_id() % 2 == 0) { 132 response.mutable_pixel_data()->set_data(kOutputStreamPacket); 133 response.mutable_pixel_data()->set_height(1); 134 response.mutable_pixel_data()->set_width(sizeof(kOutputStreamPacket)); 135 response.mutable_pixel_data()->set_step(sizeof(kOutputStreamPacket)); 136 response.mutable_pixel_data()->set_format(proto::PixelFormat::GRAY); 137 EXPECT_TRUE(response.has_pixel_data()); 138 } else { 139 response.set_semantic_data(kOutputStreamPacket); 140 EXPECT_TRUE(response.has_semantic_data()); 141 } 142 if (!writer->Write(response)) { 143 return ::grpc::Status(::grpc::StatusCode::ABORTED, "Connection lost"); 144 } 145 } 146 147 return ::grpc::Status::OK; 148 } 149 StopGraphExecution(::grpc::ServerContext * context,const proto::StopGraphExecutionRequest * request,proto::StatusResponse * response)150 ::grpc::Status StopGraphExecution(::grpc::ServerContext* context, 151 const proto::StopGraphExecutionRequest* request, 152 proto::StatusResponse* response) override { 153 response->set_code(proto::RemoteGraphStatusCode::SUCCESS); 154 response->set_message(kStopGraphMessage); 155 return ::grpc::Status::OK; 156 } 157 ResetGraph(::grpc::ServerContext * context,const proto::ResetGraphRequest * request,proto::StatusResponse * response)158 ::grpc::Status ResetGraph(::grpc::ServerContext* context, 159 const proto::ResetGraphRequest* request, 160 proto::StatusResponse* response) override { 161 response->set_code(proto::RemoteGraphStatusCode::SUCCESS); 162 response->set_message(kResetGraphMessage); 163 return ::grpc::Status::OK; 164 } 165 GetProfilingData(::grpc::ServerContext * context,const proto::ProfilingDataRequest * request,proto::ProfilingDataResponse * response)166 ::grpc::Status GetProfilingData(::grpc::ServerContext* context, 167 const proto::ProfilingDataRequest* request, 168 proto::ProfilingDataResponse* response) { 169 response->set_data(kSetGraphConfigMessage); 170 return ::grpc::Status::OK; 171 } 172 }; 173 174 class PrebuiltEngineInterfaceImpl : public PrebuiltEngineInterface { 175 private: 176 std::map<int, int> mNumPacketsPerStream; 177 std::mutex mLock; 178 std::condition_variable mCv; 179 bool mGraphTerminated = false; 180 181 public: 182 // Prebuilt to engine interface DispatchPixelData(int streamId,int64_t timestamp,const runner::InputFrame & frame)183 void DispatchPixelData(int streamId, int64_t timestamp, 184 const runner::InputFrame& frame) override { 185 ASSERT_EQ(streamId % 2, 0); 186 std::lock_guard lock(mLock); 187 if (mNumPacketsPerStream.find(streamId) == mNumPacketsPerStream.end()) { 188 mNumPacketsPerStream[streamId] = 1; 189 } else { 190 mNumPacketsPerStream[streamId]++; 191 } 192 } 193 DispatchSerializedData(int streamId,int64_t timestamp,std::string && data)194 void DispatchSerializedData(int streamId, int64_t timestamp, std::string&& data) override { 195 ASSERT_EQ(streamId % 2, 1); 196 std::lock_guard lock(mLock); 197 if (mNumPacketsPerStream.find(streamId) == mNumPacketsPerStream.end()) { 198 mNumPacketsPerStream[streamId] = 1; 199 } else { 200 mNumPacketsPerStream[streamId]++; 201 } 202 } 203 DispatchGraphTerminationMessage(Status status,std::string && msg)204 void DispatchGraphTerminationMessage(Status status, std::string&& msg) override { 205 std::lock_guard lock(mLock); 206 mGraphTerminated = true; 207 mCv.notify_one(); 208 } 209 waitForTermination()210 bool waitForTermination() { 211 std::unique_lock lock(mLock); 212 if (!mGraphTerminated) { 213 mCv.wait_for(lock, std::chrono::seconds(10), [this] { return mGraphTerminated; }); 214 } 215 return mGraphTerminated; 216 } 217 numPacketsForStream(int streamId)218 int numPacketsForStream(int streamId) { 219 std::lock_guard lock(mLock); 220 auto it = mNumPacketsPerStream.find(streamId); 221 if (it == mNumPacketsPerStream.end()) { 222 return 0; 223 } 224 return it->second; 225 } 226 }; 227 228 } // namespace graph 229 } // namespace computepipe 230 } // namespace automotive 231 } // namespace android 232 233 #endif // CPP_COMPUTEPIPE_TESTS_RUNNER_GRAPH_INCLUDES_GRPCGRAPHSERVERIMPL_H_ 234