1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/time.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/generic/async_generic_service.h>
27 #include <grpcpp/generic/generic_stub.h>
28 #include <grpcpp/impl/codegen/proto_utils.h>
29 #include <grpcpp/server.h>
30 #include <grpcpp/server_builder.h>
31 #include <grpcpp/server_context.h>
32 #include <grpcpp/support/slice.h>
33 
34 #include "src/proto/grpc/testing/echo.grpc.pb.h"
35 #include "test/core/util/port.h"
36 #include "test/core/util/test_config.h"
37 #include "test/cpp/util/byte_buffer_proto_helper.h"
38 
39 #include <gtest/gtest.h>
40 
41 using grpc::testing::EchoRequest;
42 using grpc::testing::EchoResponse;
43 using std::chrono::system_clock;
44 
45 namespace grpc {
46 namespace testing {
47 namespace {
48 
tag(int i)49 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
50 
verify_ok(CompletionQueue * cq,int i,bool expect_ok)51 void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
52   bool ok;
53   void* got_tag;
54   EXPECT_TRUE(cq->Next(&got_tag, &ok));
55   EXPECT_EQ(expect_ok, ok);
56   EXPECT_EQ(tag(i), got_tag);
57 }
58 
59 class GenericEnd2endTest : public ::testing::Test {
60  protected:
GenericEnd2endTest()61   GenericEnd2endTest() : server_host_("localhost") {}
62 
SetUp()63   void SetUp() override {
64     int port = grpc_pick_unused_port_or_die();
65     server_address_ << server_host_ << ":" << port;
66     // Setup server
67     ServerBuilder builder;
68     builder.AddListeningPort(server_address_.str(),
69                              InsecureServerCredentials());
70     builder.RegisterAsyncGenericService(&generic_service_);
71     // Include a second call to RegisterAsyncGenericService to make sure that
72     // we get an error in the log, since it is not allowed to have 2 async
73     // generic services
74     builder.RegisterAsyncGenericService(&generic_service_);
75     srv_cq_ = builder.AddCompletionQueue();
76     server_ = builder.BuildAndStart();
77   }
78 
TearDown()79   void TearDown() override {
80     server_->Shutdown();
81     void* ignored_tag;
82     bool ignored_ok;
83     cli_cq_.Shutdown();
84     srv_cq_->Shutdown();
85     while (cli_cq_.Next(&ignored_tag, &ignored_ok))
86       ;
87     while (srv_cq_->Next(&ignored_tag, &ignored_ok))
88       ;
89   }
90 
ResetStub()91   void ResetStub() {
92     std::shared_ptr<Channel> channel =
93         CreateChannel(server_address_.str(), InsecureChannelCredentials());
94     generic_stub_.reset(new GenericStub(channel));
95   }
96 
server_ok(int i)97   void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
client_ok(int i)98   void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
server_fail(int i)99   void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
client_fail(int i)100   void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
101 
SendRpc(int num_rpcs)102   void SendRpc(int num_rpcs) {
103     SendRpc(num_rpcs, false, gpr_inf_future(GPR_CLOCK_MONOTONIC));
104   }
105 
SendRpc(int num_rpcs,bool check_deadline,gpr_timespec deadline)106   void SendRpc(int num_rpcs, bool check_deadline, gpr_timespec deadline) {
107     const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
108     for (int i = 0; i < num_rpcs; i++) {
109       EchoRequest send_request;
110       EchoRequest recv_request;
111       EchoResponse send_response;
112       EchoResponse recv_response;
113       Status recv_status;
114 
115       ClientContext cli_ctx;
116       GenericServerContext srv_ctx;
117       GenericServerAsyncReaderWriter stream(&srv_ctx);
118 
119       // The string needs to be long enough to test heap-based slice.
120       send_request.set_message("Hello world. Hello world. Hello world.");
121 
122       if (check_deadline) {
123         cli_ctx.set_deadline(deadline);
124       }
125 
126       std::unique_ptr<GenericClientAsyncReaderWriter> call =
127           generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
128       call->StartCall(tag(1));
129       client_ok(1);
130       std::unique_ptr<ByteBuffer> send_buffer =
131           SerializeToByteBuffer(&send_request);
132       call->Write(*send_buffer, tag(2));
133       // Send ByteBuffer can be destroyed after calling Write.
134       send_buffer.reset();
135       client_ok(2);
136       call->WritesDone(tag(3));
137       client_ok(3);
138 
139       generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
140                                    srv_cq_.get(), tag(4));
141 
142       verify_ok(srv_cq_.get(), 4, true);
143       EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
144       EXPECT_EQ(kMethodName, srv_ctx.method());
145 
146       if (check_deadline) {
147         EXPECT_TRUE(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
148                                      gpr_time_from_millis(1000, GPR_TIMESPAN)));
149       }
150 
151       ByteBuffer recv_buffer;
152       stream.Read(&recv_buffer, tag(5));
153       server_ok(5);
154       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
155       EXPECT_EQ(send_request.message(), recv_request.message());
156 
157       send_response.set_message(recv_request.message());
158       send_buffer = SerializeToByteBuffer(&send_response);
159       stream.Write(*send_buffer, tag(6));
160       send_buffer.reset();
161       server_ok(6);
162 
163       stream.Finish(Status::OK, tag(7));
164       server_ok(7);
165 
166       recv_buffer.Clear();
167       call->Read(&recv_buffer, tag(8));
168       client_ok(8);
169       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
170 
171       call->Finish(&recv_status, tag(9));
172       client_ok(9);
173 
174       EXPECT_EQ(send_response.message(), recv_response.message());
175       EXPECT_TRUE(recv_status.ok());
176     }
177   }
178 
179   CompletionQueue cli_cq_;
180   std::unique_ptr<ServerCompletionQueue> srv_cq_;
181   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
182   std::unique_ptr<grpc::GenericStub> generic_stub_;
183   std::unique_ptr<Server> server_;
184   AsyncGenericService generic_service_;
185   const grpc::string server_host_;
186   std::ostringstream server_address_;
187 };
188 
TEST_F(GenericEnd2endTest,SimpleRpc)189 TEST_F(GenericEnd2endTest, SimpleRpc) {
190   ResetStub();
191   SendRpc(1);
192 }
193 
TEST_F(GenericEnd2endTest,SequentialRpcs)194 TEST_F(GenericEnd2endTest, SequentialRpcs) {
195   ResetStub();
196   SendRpc(10);
197 }
198 
TEST_F(GenericEnd2endTest,SequentialUnaryRpcs)199 TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) {
200   ResetStub();
201   const int num_rpcs = 10;
202   const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
203   for (int i = 0; i < num_rpcs; i++) {
204     EchoRequest send_request;
205     EchoRequest recv_request;
206     EchoResponse send_response;
207     EchoResponse recv_response;
208     Status recv_status;
209 
210     ClientContext cli_ctx;
211     GenericServerContext srv_ctx;
212     GenericServerAsyncReaderWriter stream(&srv_ctx);
213 
214     // The string needs to be long enough to test heap-based slice.
215     send_request.set_message("Hello world. Hello world. Hello world.");
216 
217     std::unique_ptr<ByteBuffer> cli_send_buffer =
218         SerializeToByteBuffer(&send_request);
219     // Use the same cq as server so that events can be polled in time.
220     std::unique_ptr<GenericClientAsyncResponseReader> call =
221         generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName,
222                                         *cli_send_buffer.get(), srv_cq_.get());
223     call->StartCall();
224     ByteBuffer cli_recv_buffer;
225     call->Finish(&cli_recv_buffer, &recv_status, tag(1));
226 
227     generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
228                                  srv_cq_.get(), tag(4));
229 
230     server_ok(4);
231     EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
232     EXPECT_EQ(kMethodName, srv_ctx.method());
233 
234     ByteBuffer srv_recv_buffer;
235     stream.Read(&srv_recv_buffer, tag(5));
236     server_ok(5);
237     EXPECT_TRUE(ParseFromByteBuffer(&srv_recv_buffer, &recv_request));
238     EXPECT_EQ(send_request.message(), recv_request.message());
239 
240     send_response.set_message(recv_request.message());
241     std::unique_ptr<ByteBuffer> srv_send_buffer =
242         SerializeToByteBuffer(&send_response);
243     stream.Write(*srv_send_buffer, tag(6));
244     server_ok(6);
245 
246     stream.Finish(Status::OK, tag(7));
247     server_ok(7);
248 
249     verify_ok(srv_cq_.get(), 1, true);
250     EXPECT_TRUE(ParseFromByteBuffer(&cli_recv_buffer, &recv_response));
251     EXPECT_EQ(send_response.message(), recv_response.message());
252     EXPECT_TRUE(recv_status.ok());
253   }
254 }
255 
256 // One ping, one pong.
TEST_F(GenericEnd2endTest,SimpleBidiStreaming)257 TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
258   ResetStub();
259 
260   const grpc::string kMethodName(
261       "/grpc.cpp.test.util.EchoTestService/BidiStream");
262   EchoRequest send_request;
263   EchoRequest recv_request;
264   EchoResponse send_response;
265   EchoResponse recv_response;
266   Status recv_status;
267   ClientContext cli_ctx;
268   GenericServerContext srv_ctx;
269   GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
270 
271   cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
272   send_request.set_message("Hello");
273   std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
274       generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
275   cli_stream->StartCall(tag(1));
276   client_ok(1);
277 
278   generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
279                                srv_cq_.get(), tag(2));
280 
281   verify_ok(srv_cq_.get(), 2, true);
282   EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
283   EXPECT_EQ(kMethodName, srv_ctx.method());
284 
285   std::unique_ptr<ByteBuffer> send_buffer =
286       SerializeToByteBuffer(&send_request);
287   cli_stream->Write(*send_buffer, tag(3));
288   send_buffer.reset();
289   client_ok(3);
290 
291   ByteBuffer recv_buffer;
292   srv_stream.Read(&recv_buffer, tag(4));
293   server_ok(4);
294   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
295   EXPECT_EQ(send_request.message(), recv_request.message());
296 
297   send_response.set_message(recv_request.message());
298   send_buffer = SerializeToByteBuffer(&send_response);
299   srv_stream.Write(*send_buffer, tag(5));
300   send_buffer.reset();
301   server_ok(5);
302 
303   cli_stream->Read(&recv_buffer, tag(6));
304   client_ok(6);
305   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
306   EXPECT_EQ(send_response.message(), recv_response.message());
307 
308   cli_stream->WritesDone(tag(7));
309   client_ok(7);
310 
311   srv_stream.Read(&recv_buffer, tag(8));
312   server_fail(8);
313 
314   srv_stream.Finish(Status::OK, tag(9));
315   server_ok(9);
316 
317   cli_stream->Finish(&recv_status, tag(10));
318   client_ok(10);
319 
320   EXPECT_EQ(send_response.message(), recv_response.message());
321   EXPECT_TRUE(recv_status.ok());
322 }
323 
TEST_F(GenericEnd2endTest,Deadline)324 TEST_F(GenericEnd2endTest, Deadline) {
325   ResetStub();
326   SendRpc(1, true,
327           gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
328                        gpr_time_from_seconds(10, GPR_TIMESPAN)));
329 }
330 
331 }  // namespace
332 }  // namespace testing
333 }  // namespace grpc
334 
main(int argc,char ** argv)335 int main(int argc, char** argv) {
336   grpc_test_init(argc, argv);
337   ::testing::InitGoogleTest(&argc, argv);
338   return RUN_ALL_TESTS();
339 }
340