1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 
21 #include <grpcpp/channel.h>
22 #include <grpcpp/client_context.h>
23 #include <grpcpp/create_channel.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26 #include <grpcpp/server_context.h>
27 
28 #include "src/core/lib/gpr/tls.h"
29 #include "src/core/lib/iomgr/port.h"
30 #include "src/proto/grpc/testing/echo.grpc.pb.h"
31 #include "test/core/util/port.h"
32 #include "test/core/util/test_config.h"
33 
34 #ifdef GRPC_POSIX_SOCKET
35 #include "src/core/lib/iomgr/ev_posix.h"
36 #endif  // GRPC_POSIX_SOCKET
37 
38 #include <gtest/gtest.h>
39 
40 #ifdef GRPC_POSIX_SOCKET
41 // Thread-local variable to so that only polls from this test assert
42 // non-blocking (not polls from resolver, timer thread, etc)
43 GPR_TLS_DECL(g_is_nonblocking_test);
44 
45 namespace {
46 
maybe_assert_non_blocking_poll(struct pollfd * pfds,nfds_t nfds,int timeout)47 int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
48                                    int timeout) {
49   if (gpr_tls_get(&g_is_nonblocking_test)) {
50     GPR_ASSERT(timeout == 0);
51   }
52   return poll(pfds, nfds, timeout);
53 }
54 
55 }  // namespace
56 
57 namespace grpc {
58 namespace testing {
59 namespace {
60 
tag(int i)61 void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); }
detag(void * p)62 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
63 
64 class NonblockingTest : public ::testing::Test {
65  protected:
NonblockingTest()66   NonblockingTest() {}
67 
SetUp()68   void SetUp() override {
69     port_ = grpc_pick_unused_port_or_die();
70     server_address_ << "localhost:" << port_;
71 
72     // Setup server
73     BuildAndStartServer();
74   }
75 
LoopForTag(void ** tag,bool * ok)76   bool LoopForTag(void** tag, bool* ok) {
77     for (;;) {
78       auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME));
79       if (r == CompletionQueue::SHUTDOWN) {
80         return false;
81       } else if (r == CompletionQueue::GOT_EVENT) {
82         return true;
83       }
84     }
85   }
86 
TearDown()87   void TearDown() override {
88     server_->Shutdown();
89     void* ignored_tag;
90     bool ignored_ok;
91     cq_->Shutdown();
92     while (LoopForTag(&ignored_tag, &ignored_ok))
93       ;
94     stub_.reset();
95     grpc_recycle_unused_port(port_);
96   }
97 
BuildAndStartServer()98   void BuildAndStartServer() {
99     ServerBuilder builder;
100     builder.AddListeningPort(server_address_.str(),
101                              grpc::InsecureServerCredentials());
102     service_.reset(new grpc::testing::EchoTestService::AsyncService());
103     builder.RegisterService(service_.get());
104     cq_ = builder.AddCompletionQueue();
105     server_ = builder.BuildAndStart();
106   }
107 
ResetStub()108   void ResetStub() {
109     std::shared_ptr<Channel> channel = CreateChannel(
110         server_address_.str(), grpc::InsecureChannelCredentials());
111     stub_ = grpc::testing::EchoTestService::NewStub(channel);
112   }
113 
SendRpc(int num_rpcs)114   void SendRpc(int num_rpcs) {
115     for (int i = 0; i < num_rpcs; i++) {
116       EchoRequest send_request;
117       EchoRequest recv_request;
118       EchoResponse send_response;
119       EchoResponse recv_response;
120       Status recv_status;
121 
122       ClientContext cli_ctx;
123       ServerContext srv_ctx;
124       grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
125 
126       send_request.set_message("hello non-blocking world");
127       std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
128           stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get()));
129 
130       response_reader->StartCall();
131       response_reader->Finish(&recv_response, &recv_status, tag(4));
132 
133       service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
134                             cq_.get(), cq_.get(), tag(2));
135 
136       void* got_tag;
137       bool ok;
138       EXPECT_TRUE(LoopForTag(&got_tag, &ok));
139       EXPECT_TRUE(ok);
140       EXPECT_EQ(detag(got_tag), 2);
141       EXPECT_EQ(send_request.message(), recv_request.message());
142 
143       send_response.set_message(recv_request.message());
144       response_writer.Finish(send_response, Status::OK, tag(3));
145 
146       int tagsum = 0;
147       int tagprod = 1;
148       EXPECT_TRUE(LoopForTag(&got_tag, &ok));
149       EXPECT_TRUE(ok);
150       tagsum += detag(got_tag);
151       tagprod *= detag(got_tag);
152 
153       EXPECT_TRUE(LoopForTag(&got_tag, &ok));
154       EXPECT_TRUE(ok);
155       tagsum += detag(got_tag);
156       tagprod *= detag(got_tag);
157 
158       EXPECT_EQ(tagsum, 7);
159       EXPECT_EQ(tagprod, 12);
160       EXPECT_EQ(send_response.message(), recv_response.message());
161       EXPECT_TRUE(recv_status.ok());
162     }
163   }
164 
165   std::unique_ptr<ServerCompletionQueue> cq_;
166   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
167   std::unique_ptr<Server> server_;
168   std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
169   std::ostringstream server_address_;
170   int port_;
171 };
172 
TEST_F(NonblockingTest,SimpleRpc)173 TEST_F(NonblockingTest, SimpleRpc) {
174   ResetStub();
175   SendRpc(10);
176 }
177 
178 }  // namespace
179 }  // namespace testing
180 }  // namespace grpc
181 
182 #endif  // GRPC_POSIX_SOCKET
183 
main(int argc,char ** argv)184 int main(int argc, char** argv) {
185 #ifdef GRPC_POSIX_SOCKET
186   // Override the poll function before anything else can happen
187   grpc_poll_function = maybe_assert_non_blocking_poll;
188 #endif  // GRPC_POSIX_SOCKET
189 
190   grpc_test_init(argc, argv);
191   ::testing::InitGoogleTest(&argc, argv);
192   int ret = RUN_ALL_TESTS();
193   return ret;
194 }
195