1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* Benchmark gRPC end2end in various configurations */
20 
21 #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
22 #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PUMP_H
23 
24 #include <benchmark/benchmark.h>
25 #include <sstream>
26 #include "src/core/lib/profiling/timers.h"
27 #include "src/proto/grpc/testing/echo.grpc.pb.h"
28 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
29 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
30 
31 namespace grpc {
32 namespace testing {
33 
34 /*******************************************************************************
35  * BENCHMARKING KERNELS
36  */
37 
tag(intptr_t x)38 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
39 
40 template <class Fixture>
BM_PumpStreamClientToServer(benchmark::State & state)41 static void BM_PumpStreamClientToServer(benchmark::State& state) {
42   EchoTestService::AsyncService service;
43   std::unique_ptr<Fixture> fixture(new Fixture(&service));
44   {
45     EchoRequest send_request;
46     EchoRequest recv_request;
47     if (state.range(0) > 0) {
48       send_request.set_message(std::string(state.range(0), 'a'));
49     }
50     Status recv_status;
51     ServerContext svr_ctx;
52     ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
53     service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
54                               fixture->cq(), tag(0));
55     std::unique_ptr<EchoTestService::Stub> stub(
56         EchoTestService::NewStub(fixture->channel()));
57     ClientContext cli_ctx;
58     auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
59     int need_tags = (1 << 0) | (1 << 1);
60     void* t;
61     bool ok;
62     while (need_tags) {
63       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
64       GPR_ASSERT(ok);
65       int i = static_cast<int>((intptr_t)t);
66       GPR_ASSERT(need_tags & (1 << i));
67       need_tags &= ~(1 << i);
68     }
69     response_rw.Read(&recv_request, tag(0));
70     while (state.KeepRunning()) {
71       GPR_TIMER_SCOPE("BenchmarkCycle", 0);
72       request_rw->Write(send_request, tag(1));
73       while (true) {
74         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
75         if (t == tag(0)) {
76           response_rw.Read(&recv_request, tag(0));
77         } else if (t == tag(1)) {
78           break;
79         } else {
80           GPR_ASSERT(false);
81         }
82       }
83     }
84     request_rw->WritesDone(tag(1));
85     need_tags = (1 << 0) | (1 << 1);
86     while (need_tags) {
87       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
88       int i = static_cast<int>((intptr_t)t);
89       GPR_ASSERT(need_tags & (1 << i));
90       need_tags &= ~(1 << i);
91     }
92     response_rw.Finish(Status::OK, tag(0));
93     Status final_status;
94     request_rw->Finish(&final_status, tag(1));
95     need_tags = (1 << 0) | (1 << 1);
96     while (need_tags) {
97       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
98       int i = static_cast<int>((intptr_t)t);
99       GPR_ASSERT(need_tags & (1 << i));
100       need_tags &= ~(1 << i);
101     }
102     GPR_ASSERT(final_status.ok());
103   }
104   fixture->Finish(state);
105   fixture.reset();
106   state.SetBytesProcessed(state.range(0) * state.iterations());
107 }
108 
109 template <class Fixture>
BM_PumpStreamServerToClient(benchmark::State & state)110 static void BM_PumpStreamServerToClient(benchmark::State& state) {
111   EchoTestService::AsyncService service;
112   std::unique_ptr<Fixture> fixture(new Fixture(&service));
113   {
114     EchoResponse send_response;
115     EchoResponse recv_response;
116     if (state.range(0) > 0) {
117       send_response.set_message(std::string(state.range(0), 'a'));
118     }
119     Status recv_status;
120     ServerContext svr_ctx;
121     ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
122     service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
123                               fixture->cq(), tag(0));
124     std::unique_ptr<EchoTestService::Stub> stub(
125         EchoTestService::NewStub(fixture->channel()));
126     ClientContext cli_ctx;
127     auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
128     int need_tags = (1 << 0) | (1 << 1);
129     void* t;
130     bool ok;
131     while (need_tags) {
132       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
133       GPR_ASSERT(ok);
134       int i = static_cast<int>((intptr_t)t);
135       GPR_ASSERT(need_tags & (1 << i));
136       need_tags &= ~(1 << i);
137     }
138     request_rw->Read(&recv_response, tag(0));
139     while (state.KeepRunning()) {
140       GPR_TIMER_SCOPE("BenchmarkCycle", 0);
141       response_rw.Write(send_response, tag(1));
142       while (true) {
143         GPR_ASSERT(fixture->cq()->Next(&t, &ok));
144         if (t == tag(0)) {
145           request_rw->Read(&recv_response, tag(0));
146         } else if (t == tag(1)) {
147           break;
148         } else {
149           GPR_ASSERT(false);
150         }
151       }
152     }
153     response_rw.Finish(Status::OK, tag(1));
154     need_tags = (1 << 0) | (1 << 1);
155     while (need_tags) {
156       GPR_ASSERT(fixture->cq()->Next(&t, &ok));
157       int i = static_cast<int>((intptr_t)t);
158       GPR_ASSERT(need_tags & (1 << i));
159       need_tags &= ~(1 << i);
160     }
161   }
162   fixture->Finish(state);
163   fixture.reset();
164   state.SetBytesProcessed(state.range(0) * state.iterations());
165 }
166 }  // namespace testing
167 }  // namespace grpc
168 
169 #endif  // TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H
170