1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <atomic>
20 #include <thread>
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/alloc.h>
24 #include <grpcpp/security/server_credentials.h>
25 #include <grpcpp/server.h>
26 #include <grpcpp/server_context.h>
27 
28 #include "src/core/lib/gpr/host_port.h"
29 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
30 #include "test/cpp/qps/qps_server_builder.h"
31 #include "test/cpp/qps/server.h"
32 #include "test/cpp/qps/usage_timer.h"
33 
34 namespace grpc {
35 namespace testing {
36 
37 class BenchmarkServiceImpl final : public BenchmarkService::Service {
38  public:
UnaryCall(ServerContext * context,const SimpleRequest * request,SimpleResponse * response)39   Status UnaryCall(ServerContext* context, const SimpleRequest* request,
40                    SimpleResponse* response) override {
41     auto s = SetResponse(request, response);
42     if (!s.ok()) {
43       return s;
44     }
45     return Status::OK;
46   }
StreamingCall(ServerContext * context,ServerReaderWriter<SimpleResponse,SimpleRequest> * stream)47   Status StreamingCall(
48       ServerContext* context,
49       ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
50     SimpleRequest request;
51     while (stream->Read(&request)) {
52       SimpleResponse response;
53       auto s = SetResponse(&request, &response);
54       if (!s.ok()) {
55         return s;
56       }
57       if (!stream->Write(response)) {
58         return Status(StatusCode::INTERNAL, "Server couldn't respond");
59       }
60     }
61     return Status::OK;
62   }
StreamingFromClient(ServerContext * context,ServerReader<SimpleRequest> * stream,SimpleResponse * response)63   Status StreamingFromClient(ServerContext* context,
64                              ServerReader<SimpleRequest>* stream,
65                              SimpleResponse* response) override {
66     auto s = ClientPull(context, stream, response);
67     if (!s.ok()) {
68       return s;
69     }
70     return Status::OK;
71   }
StreamingFromServer(ServerContext * context,const SimpleRequest * request,ServerWriter<SimpleResponse> * stream)72   Status StreamingFromServer(ServerContext* context,
73                              const SimpleRequest* request,
74                              ServerWriter<SimpleResponse>* stream) override {
75     SimpleResponse response;
76     auto s = SetResponse(request, &response);
77     if (!s.ok()) {
78       return s;
79     }
80     return ServerPush(context, stream, response, nullptr);
81   }
StreamingBothWays(ServerContext * context,ServerReaderWriter<SimpleResponse,SimpleRequest> * stream)82   Status StreamingBothWays(
83       ServerContext* context,
84       ServerReaderWriter<SimpleResponse, SimpleRequest>* stream) override {
85     // Read the first client message to setup server response
86     SimpleRequest request;
87     if (!stream->Read(&request)) {
88       return Status::OK;
89     }
90     SimpleResponse response;
91     auto s = SetResponse(&request, &response);
92     if (!s.ok()) {
93       return s;
94     }
95     std::atomic_bool done;
96     Status sp;
97     std::thread t([context, stream, &response, &done, &sp]() {
98       sp = ServerPush(context, stream, response, [&done]() {
99         return done.load(std::memory_order_relaxed);
100       });
101     });
102     SimpleResponse dummy;
103     auto cp = ClientPull(context, stream, &dummy);
104     done.store(true, std::memory_order_relaxed);  // can be lazy
105     t.join();
106     if (!cp.ok()) {
107       return cp;
108     }
109     if (!sp.ok()) {
110       return sp;
111     }
112     return Status::OK;
113   }
114 
115  private:
116   template <class R>
ClientPull(ServerContext * context,R * stream,SimpleResponse * response)117   static Status ClientPull(ServerContext* context, R* stream,
118                            SimpleResponse* response) {
119     SimpleRequest request;
120     while (stream->Read(&request)) {
121     }
122     if (request.response_size() > 0) {
123       if (!Server::SetPayload(request.response_type(), request.response_size(),
124                               response->mutable_payload())) {
125         return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
126       }
127     }
128     return Status::OK;
129   }
130   template <class W>
ServerPush(ServerContext * context,W * stream,const SimpleResponse & response,const std::function<bool ()> & done)131   static Status ServerPush(ServerContext* context, W* stream,
132                            const SimpleResponse& response,
133                            const std::function<bool()>& done) {
134     while ((done == nullptr) || !done()) {
135       // TODO(vjpai): Add potential for rate-pacing on this
136       if (!stream->Write(response)) {
137         return Status(StatusCode::INTERNAL, "Server couldn't push");
138       }
139     }
140     return Status::OK;
141   }
SetResponse(const SimpleRequest * request,SimpleResponse * response)142   static Status SetResponse(const SimpleRequest* request,
143                             SimpleResponse* response) {
144     if (request->response_size() > 0) {
145       if (!Server::SetPayload(request->response_type(),
146                               request->response_size(),
147                               response->mutable_payload())) {
148         return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
149       }
150     }
151     return Status::OK;
152   }
153 };
154 
155 class SynchronousServer final : public grpc::testing::Server {
156  public:
SynchronousServer(const ServerConfig & config)157   explicit SynchronousServer(const ServerConfig& config) : Server(config) {
158     std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
159 
160     auto port_num = port();
161     // Negative port number means inproc server, so no listen port needed
162     if (port_num >= 0) {
163       char* server_address = nullptr;
164       gpr_join_host_port(&server_address, "::", port_num);
165       builder->AddListeningPort(server_address,
166                                 Server::CreateServerCredentials(config));
167       gpr_free(server_address);
168     }
169 
170     ApplyConfigToBuilder(config, builder.get());
171 
172     builder->RegisterService(&service_);
173 
174     impl_ = builder->BuildAndStart();
175   }
176 
InProcessChannel(const ChannelArguments & args)177   std::shared_ptr<Channel> InProcessChannel(
178       const ChannelArguments& args) override {
179     return impl_->InProcessChannel(args);
180   }
181 
182  private:
183   BenchmarkServiceImpl service_;
184   std::unique_ptr<grpc::Server> impl_;
185 };
186 
CreateSynchronousServer(const ServerConfig & config)187 std::unique_ptr<grpc::testing::Server> CreateSynchronousServer(
188     const ServerConfig& config) {
189   return std::unique_ptr<Server>(new SynchronousServer(config));
190 }
191 
192 }  // namespace testing
193 }  // namespace grpc
194