1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <algorithm>
20 #include <forward_list>
21 #include <functional>
22 #include <memory>
23 #include <mutex>
24 #include <thread>
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpcpp/generic/async_generic_service.h>
30 #include <grpcpp/resource_quota.h>
31 #include <grpcpp/security/server_credentials.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35 #include <grpcpp/support/config.h>
36 
37 #include "src/core/lib/gpr/host_port.h"
38 #include "src/core/lib/surface/completion_queue.h"
39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
40 #include "test/core/util/test_config.h"
41 #include "test/cpp/qps/qps_server_builder.h"
42 #include "test/cpp/qps/server.h"
43 
44 namespace grpc {
45 namespace testing {
46 
47 template <class RequestType, class ResponseType, class ServiceType,
48           class ServerContextType>
49 class AsyncQpsServerTest final : public grpc::testing::Server {
50  public:
AsyncQpsServerTest(const ServerConfig & config,std::function<void (ServerBuilder *,ServiceType *)> register_service,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncResponseWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_unary_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReader<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_client_function,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_server_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_both_ways_function,std::function<grpc::Status (const PayloadConfig &,RequestType *,ResponseType *)> process_rpc)51   AsyncQpsServerTest(
52       const ServerConfig& config,
53       std::function<void(ServerBuilder*, ServiceType*)> register_service,
54       std::function<void(ServiceType*, ServerContextType*, RequestType*,
55                          ServerAsyncResponseWriter<ResponseType>*,
56                          CompletionQueue*, ServerCompletionQueue*, void*)>
57           request_unary_function,
58       std::function<void(ServiceType*, ServerContextType*,
59                          ServerAsyncReaderWriter<ResponseType, RequestType>*,
60                          CompletionQueue*, ServerCompletionQueue*, void*)>
61           request_streaming_function,
62       std::function<void(ServiceType*, ServerContextType*,
63                          ServerAsyncReader<ResponseType, RequestType>*,
64                          CompletionQueue*, ServerCompletionQueue*, void*)>
65           request_streaming_from_client_function,
66       std::function<void(ServiceType*, ServerContextType*, RequestType*,
67                          ServerAsyncWriter<ResponseType>*, CompletionQueue*,
68                          ServerCompletionQueue*, void*)>
69           request_streaming_from_server_function,
70       std::function<void(ServiceType*, ServerContextType*,
71                          ServerAsyncReaderWriter<ResponseType, RequestType>*,
72                          CompletionQueue*, ServerCompletionQueue*, void*)>
73           request_streaming_both_ways_function,
74       std::function<grpc::Status(const PayloadConfig&, RequestType*,
75                                  ResponseType*)>
76           process_rpc)
77       : Server(config) {
78     std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
79 
80     auto port_num = port();
81     // Negative port number means inproc server, so no listen port needed
82     if (port_num >= 0) {
83       char* server_address = nullptr;
84       gpr_join_host_port(&server_address, "::", port_num);
85       builder->AddListeningPort(server_address,
86                                 Server::CreateServerCredentials(config));
87       gpr_free(server_address);
88     }
89 
90     register_service(builder.get(), &async_service_);
91 
92     int num_threads = config.async_server_threads();
93     if (num_threads <= 0) {  // dynamic sizing
94       num_threads = cores();
95       gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads);
96     }
97 
98     int tpc = std::max(1, config.threads_per_cq());  // 1 if unspecified
99     int num_cqs = (num_threads + tpc - 1) / tpc;     // ceiling operator
100     for (int i = 0; i < num_cqs; i++) {
101       srv_cqs_.emplace_back(builder->AddCompletionQueue());
102     }
103     for (int i = 0; i < num_threads; i++) {
104       cq_.emplace_back(i % srv_cqs_.size());
105     }
106 
107     ApplyConfigToBuilder(config, builder.get());
108 
109     server_ = builder->BuildAndStart();
110 
111     auto process_rpc_bound =
112         std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
113                   std::placeholders::_2);
114 
115     for (int i = 0; i < 5000; i++) {
116       for (int j = 0; j < num_cqs; j++) {
117         if (request_unary_function) {
118           auto request_unary = std::bind(
119               request_unary_function, &async_service_, std::placeholders::_1,
120               std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(),
121               srv_cqs_[j].get(), std::placeholders::_4);
122           contexts_.emplace_back(
123               new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound));
124         }
125         if (request_streaming_function) {
126           auto request_streaming = std::bind(
127               request_streaming_function, &async_service_,
128               std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
129               srv_cqs_[j].get(), std::placeholders::_3);
130           contexts_.emplace_back(new ServerRpcContextStreamingImpl(
131               request_streaming, process_rpc_bound));
132         }
133         if (request_streaming_from_client_function) {
134           auto request_streaming_from_client = std::bind(
135               request_streaming_from_client_function, &async_service_,
136               std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
137               srv_cqs_[j].get(), std::placeholders::_3);
138           contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
139               request_streaming_from_client, process_rpc_bound));
140         }
141         if (request_streaming_from_server_function) {
142           auto request_streaming_from_server =
143               std::bind(request_streaming_from_server_function, &async_service_,
144                         std::placeholders::_1, std::placeholders::_2,
145                         std::placeholders::_3, srv_cqs_[j].get(),
146                         srv_cqs_[j].get(), std::placeholders::_4);
147           contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
148               request_streaming_from_server, process_rpc_bound));
149         }
150         if (request_streaming_both_ways_function) {
151           // TODO(vjpai): Add this code
152         }
153       }
154     }
155 
156     for (int i = 0; i < num_threads; i++) {
157       shutdown_state_.emplace_back(new PerThreadShutdownState());
158       threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
159     }
160   }
~AsyncQpsServerTest()161   ~AsyncQpsServerTest() {
162     for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
163       std::lock_guard<std::mutex> lock((*ss)->mutex);
164       (*ss)->shutdown = true;
165     }
166     std::thread shutdown_thread(&AsyncQpsServerTest::ShutdownThreadFunc, this);
167     for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
168       (*cq)->Shutdown();
169     }
170     for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
171       thr->join();
172     }
173     for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
174       bool ok;
175       void* got_tag;
176       while ((*cq)->Next(&got_tag, &ok))
177         ;
178     }
179     shutdown_thread.join();
180   }
181 
GetPollCount()182   int GetPollCount() override {
183     int count = 0;
184     for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
185       count += grpc_get_cq_poll_num((*cq)->cq());
186     }
187     return count;
188   }
189 
InProcessChannel(const ChannelArguments & args)190   std::shared_ptr<Channel> InProcessChannel(
191       const ChannelArguments& args) override {
192     return server_->InProcessChannel(args);
193   }
194 
195  private:
ShutdownThreadFunc()196   void ShutdownThreadFunc() {
197     // TODO (vpai): Remove this deadline and allow Shutdown to finish properly
198     auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(3);
199     server_->Shutdown(deadline);
200   }
201 
ThreadFunc(int thread_idx)202   void ThreadFunc(int thread_idx) {
203     // Wait until work is available or we are shutting down
204     bool ok;
205     void* got_tag;
206     if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
207       return;
208     }
209     ServerRpcContext* ctx;
210     std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex;
211     do {
212       ctx = detag(got_tag);
213       // The tag is a pointer to an RPC context to invoke
214       // Proceed while holding a lock to make sure that
215       // this thread isn't supposed to shut down
216       mu_ptr->lock();
217       if (shutdown_state_[thread_idx]->shutdown) {
218         mu_ptr->unlock();
219         return;
220       }
221     } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
222         [&, ctx, ok, mu_ptr]() {
223           ctx->lock();
224           if (!ctx->RunNextState(ok)) {
225             ctx->Reset();
226           }
227           ctx->unlock();
228           mu_ptr->unlock();
229         },
230         &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
231   }
232 
233   class ServerRpcContext {
234    public:
ServerRpcContext()235     ServerRpcContext() {}
lock()236     void lock() { mu_.lock(); }
unlock()237     void unlock() { mu_.unlock(); }
~ServerRpcContext()238     virtual ~ServerRpcContext(){};
239     virtual bool RunNextState(bool) = 0;  // next state, return false if done
240     virtual void Reset() = 0;             // start this back at a clean state
241    private:
242     std::mutex mu_;
243   };
tag(ServerRpcContext * func)244   static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); }
detag(void * tag)245   static ServerRpcContext* detag(void* tag) {
246     return static_cast<ServerRpcContext*>(tag);
247   }
248 
249   class ServerRpcContextUnaryImpl final : public ServerRpcContext {
250    public:
ServerRpcContextUnaryImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncResponseWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)251     ServerRpcContextUnaryImpl(
252         std::function<void(ServerContextType*, RequestType*,
253                            grpc::ServerAsyncResponseWriter<ResponseType>*,
254                            void*)>
255             request_method,
256         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
257         : srv_ctx_(new ServerContextType),
258           next_state_(&ServerRpcContextUnaryImpl::invoker),
259           request_method_(request_method),
260           invoke_method_(invoke_method),
261           response_writer_(srv_ctx_.get()) {
262       request_method_(srv_ctx_.get(), &req_, &response_writer_,
263                       AsyncQpsServerTest::tag(this));
264     }
~ServerRpcContextUnaryImpl()265     ~ServerRpcContextUnaryImpl() override {}
RunNextState(bool ok)266     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()267     void Reset() override {
268       srv_ctx_.reset(new ServerContextType);
269       req_ = RequestType();
270       response_writer_ =
271           grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get());
272 
273       // Then request the method
274       next_state_ = &ServerRpcContextUnaryImpl::invoker;
275       request_method_(srv_ctx_.get(), &req_, &response_writer_,
276                       AsyncQpsServerTest::tag(this));
277     }
278 
279    private:
finisher(bool)280     bool finisher(bool) { return false; }
invoker(bool ok)281     bool invoker(bool ok) {
282       if (!ok) {
283         return false;
284       }
285 
286       // Call the RPC processing function
287       grpc::Status status = invoke_method_(&req_, &response_);
288 
289       // Have the response writer work and invoke on_finish when done
290       next_state_ = &ServerRpcContextUnaryImpl::finisher;
291       response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this));
292       return true;
293     }
294     std::unique_ptr<ServerContextType> srv_ctx_;
295     RequestType req_;
296     ResponseType response_;
297     bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
298     std::function<void(ServerContextType*, RequestType*,
299                        grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
300         request_method_;
301     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
302     grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
303   };
304 
305   class ServerRpcContextStreamingImpl final : public ServerRpcContext {
306    public:
ServerRpcContextStreamingImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReaderWriter<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)307     ServerRpcContextStreamingImpl(
308         std::function<void(
309             ServerContextType*,
310             grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
311             request_method,
312         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
313         : srv_ctx_(new ServerContextType),
314           next_state_(&ServerRpcContextStreamingImpl::request_done),
315           request_method_(request_method),
316           invoke_method_(invoke_method),
317           stream_(srv_ctx_.get()) {
318       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
319     }
~ServerRpcContextStreamingImpl()320     ~ServerRpcContextStreamingImpl() override {}
RunNextState(bool ok)321     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()322     void Reset() override {
323       srv_ctx_.reset(new ServerContextType);
324       req_ = RequestType();
325       stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(
326           srv_ctx_.get());
327 
328       // Then request the method
329       next_state_ = &ServerRpcContextStreamingImpl::request_done;
330       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
331     }
332 
333    private:
request_done(bool ok)334     bool request_done(bool ok) {
335       if (!ok) {
336         return false;
337       }
338       next_state_ = &ServerRpcContextStreamingImpl::read_done;
339       stream_.Read(&req_, AsyncQpsServerTest::tag(this));
340       return true;
341     }
342 
read_done(bool ok)343     bool read_done(bool ok) {
344       if (ok) {
345         // invoke the method
346         // Call the RPC processing function
347         grpc::Status status = invoke_method_(&req_, &response_);
348         // initiate the write
349         next_state_ = &ServerRpcContextStreamingImpl::write_done;
350         stream_.Write(response_, AsyncQpsServerTest::tag(this));
351       } else {  // client has sent writes done
352         // finish the stream
353         next_state_ = &ServerRpcContextStreamingImpl::finish_done;
354         stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
355       }
356       return true;
357     }
write_done(bool ok)358     bool write_done(bool ok) {
359       // now go back and get another streaming read!
360       if (ok) {
361         next_state_ = &ServerRpcContextStreamingImpl::read_done;
362         stream_.Read(&req_, AsyncQpsServerTest::tag(this));
363       } else {
364         next_state_ = &ServerRpcContextStreamingImpl::finish_done;
365         stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
366       }
367       return true;
368     }
finish_done(bool ok)369     bool finish_done(bool ok) { return false; /* reset the context */ }
370 
371     std::unique_ptr<ServerContextType> srv_ctx_;
372     RequestType req_;
373     ResponseType response_;
374     bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
375     std::function<void(
376         ServerContextType*,
377         grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
378         request_method_;
379     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
380     grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
381   };
382 
383   class ServerRpcContextStreamingFromClientImpl final
384       : public ServerRpcContext {
385    public:
ServerRpcContextStreamingFromClientImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReader<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)386     ServerRpcContextStreamingFromClientImpl(
387         std::function<void(ServerContextType*,
388                            grpc::ServerAsyncReader<ResponseType, RequestType>*,
389                            void*)>
390             request_method,
391         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
392         : srv_ctx_(new ServerContextType),
393           next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
394           request_method_(request_method),
395           invoke_method_(invoke_method),
396           stream_(srv_ctx_.get()) {
397       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
398     }
~ServerRpcContextStreamingFromClientImpl()399     ~ServerRpcContextStreamingFromClientImpl() override {}
RunNextState(bool ok)400     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()401     void Reset() override {
402       srv_ctx_.reset(new ServerContextType);
403       req_ = RequestType();
404       stream_ =
405           grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
406 
407       // Then request the method
408       next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
409       request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
410     }
411 
412    private:
request_done(bool ok)413     bool request_done(bool ok) {
414       if (!ok) {
415         return false;
416       }
417       next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
418       stream_.Read(&req_, AsyncQpsServerTest::tag(this));
419       return true;
420     }
421 
read_done(bool ok)422     bool read_done(bool ok) {
423       if (ok) {
424         // In this case, just do another read
425         // next_state_ is unchanged
426         stream_.Read(&req_, AsyncQpsServerTest::tag(this));
427         return true;
428       } else {  // client has sent writes done
429         // invoke the method
430         // Call the RPC processing function
431         grpc::Status status = invoke_method_(&req_, &response_);
432         // finish the stream
433         next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
434         stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
435       }
436       return true;
437     }
finish_done(bool ok)438     bool finish_done(bool ok) { return false; /* reset the context */ }
439 
440     std::unique_ptr<ServerContextType> srv_ctx_;
441     RequestType req_;
442     ResponseType response_;
443     bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
444     std::function<void(ServerContextType*,
445                        grpc::ServerAsyncReader<ResponseType, RequestType>*,
446                        void*)>
447         request_method_;
448     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
449     grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
450   };
451 
452   class ServerRpcContextStreamingFromServerImpl final
453       : public ServerRpcContext {
454    public:
ServerRpcContextStreamingFromServerImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)455     ServerRpcContextStreamingFromServerImpl(
456         std::function<void(ServerContextType*, RequestType*,
457                            grpc::ServerAsyncWriter<ResponseType>*, void*)>
458             request_method,
459         std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
460         : srv_ctx_(new ServerContextType),
461           next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
462           request_method_(request_method),
463           invoke_method_(invoke_method),
464           stream_(srv_ctx_.get()) {
465       request_method_(srv_ctx_.get(), &req_, &stream_,
466                       AsyncQpsServerTest::tag(this));
467     }
~ServerRpcContextStreamingFromServerImpl()468     ~ServerRpcContextStreamingFromServerImpl() override {}
RunNextState(bool ok)469     bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()470     void Reset() override {
471       srv_ctx_.reset(new ServerContextType);
472       req_ = RequestType();
473       stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
474 
475       // Then request the method
476       next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
477       request_method_(srv_ctx_.get(), &req_, &stream_,
478                       AsyncQpsServerTest::tag(this));
479     }
480 
481    private:
request_done(bool ok)482     bool request_done(bool ok) {
483       if (!ok) {
484         return false;
485       }
486       // invoke the method
487       // Call the RPC processing function
488       grpc::Status status = invoke_method_(&req_, &response_);
489 
490       next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
491       stream_.Write(response_, AsyncQpsServerTest::tag(this));
492       return true;
493     }
494 
write_done(bool ok)495     bool write_done(bool ok) {
496       if (ok) {
497         // Do another write!
498         // next_state_ is unchanged
499         stream_.Write(response_, AsyncQpsServerTest::tag(this));
500       } else {  // must be done so let's finish
501         next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
502         stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
503       }
504       return true;
505     }
finish_done(bool ok)506     bool finish_done(bool ok) { return false; /* reset the context */ }
507 
508     std::unique_ptr<ServerContextType> srv_ctx_;
509     RequestType req_;
510     ResponseType response_;
511     bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
512     std::function<void(ServerContextType*, RequestType*,
513                        grpc::ServerAsyncWriter<ResponseType>*, void*)>
514         request_method_;
515     std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
516     grpc::ServerAsyncWriter<ResponseType> stream_;
517   };
518 
519   std::vector<std::thread> threads_;
520   std::unique_ptr<grpc::Server> server_;
521   std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
522   std::vector<int> cq_;
523   ServiceType async_service_;
524   std::vector<std::unique_ptr<ServerRpcContext>> contexts_;
525 
526   struct PerThreadShutdownState {
527     mutable std::mutex mutex;
528     bool shutdown;
PerThreadShutdownStategrpc::testing::AsyncQpsServerTest::PerThreadShutdownState529     PerThreadShutdownState() : shutdown(false) {}
530   };
531 
532   std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
533 };
534 
RegisterBenchmarkService(ServerBuilder * builder,BenchmarkService::AsyncService * service)535 static void RegisterBenchmarkService(ServerBuilder* builder,
536                                      BenchmarkService::AsyncService* service) {
537   builder->RegisterService(service);
538 }
RegisterGenericService(ServerBuilder * builder,grpc::AsyncGenericService * service)539 static void RegisterGenericService(ServerBuilder* builder,
540                                    grpc::AsyncGenericService* service) {
541   builder->RegisterAsyncGenericService(service);
542 }
543 
ProcessSimpleRPC(const PayloadConfig &,SimpleRequest * request,SimpleResponse * response)544 static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request,
545                                SimpleResponse* response) {
546   if (request->response_size() > 0) {
547     if (!Server::SetPayload(request->response_type(), request->response_size(),
548                             response->mutable_payload())) {
549       return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
550     }
551   }
552   // We are done using the request. Clear it to reduce working memory.
553   // This proves to reduce cache misses in large message size cases.
554   request->Clear();
555   return Status::OK;
556 }
557 
ProcessGenericRPC(const PayloadConfig & payload_config,ByteBuffer * request,ByteBuffer * response)558 static Status ProcessGenericRPC(const PayloadConfig& payload_config,
559                                 ByteBuffer* request, ByteBuffer* response) {
560   // We are done using the request. Clear it to reduce working memory.
561   // This proves to reduce cache misses in large message size cases.
562   request->Clear();
563   int resp_size = payload_config.bytebuf_params().resp_size();
564   std::unique_ptr<char[]> buf(new char[resp_size]);
565   Slice slice(buf.get(), resp_size);
566   *response = ByteBuffer(&slice, 1);
567   return Status::OK;
568 }
569 
CreateAsyncServer(const ServerConfig & config)570 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
571   return std::unique_ptr<Server>(
572       new AsyncQpsServerTest<SimpleRequest, SimpleResponse,
573                              BenchmarkService::AsyncService,
574                              grpc::ServerContext>(
575           config, RegisterBenchmarkService,
576           &BenchmarkService::AsyncService::RequestUnaryCall,
577           &BenchmarkService::AsyncService::RequestStreamingCall,
578           &BenchmarkService::AsyncService::RequestStreamingFromClient,
579           &BenchmarkService::AsyncService::RequestStreamingFromServer,
580           &BenchmarkService::AsyncService::RequestStreamingBothWays,
581           ProcessSimpleRPC));
582 }
CreateAsyncGenericServer(const ServerConfig & config)583 std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) {
584   return std::unique_ptr<Server>(
585       new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
586                              grpc::GenericServerContext>(
587           config, RegisterGenericService, nullptr,
588           &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
589           ProcessGenericRPC));
590 }
591 
592 }  // namespace testing
593 }  // namespace grpc
594