1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpc/support/log.h>
22 #include <grpc/support/string_util.h>
23 
24 #include <functional>
25 #include <memory>
26 #include <thread>
27 
28 #include <gtest/gtest.h>
29 
30 #include "src/core/lib/iomgr/endpoint.h"
31 #include "src/core/lib/iomgr/error.h"
32 #include "src/core/lib/iomgr/pollset.h"
33 #include "src/core/lib/iomgr/pollset_set.h"
34 #include "src/core/lib/iomgr/resolve_address.h"
35 #include "src/core/lib/iomgr/tcp_client.h"
36 #include "src/core/lib/slice/slice_internal.h"
37 
38 #include "test/core/util/port.h"
39 #include "test/core/util/test_config.h"
40 
41 namespace grpc_core {
42 namespace test {
43 namespace {
44 
45 // A gRPC server, running in its own thread.
46 class ServerThread {
47  public:
ServerThread(const char * address)48   explicit ServerThread(const char* address) : address_(address) {}
49 
Start()50   void Start() {
51     // Start server with 1-second handshake timeout.
52     grpc_arg arg;
53     arg.type = GRPC_ARG_INTEGER;
54     arg.key = const_cast<char*>(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
55     arg.value.integer = 1000;
56     grpc_channel_args args = {1, &arg};
57     server_ = grpc_server_create(&args, nullptr);
58     ASSERT_TRUE(grpc_server_add_insecure_http2_port(server_, address_));
59     cq_ = grpc_completion_queue_create_for_next(nullptr);
60     grpc_server_register_completion_queue(server_, cq_, nullptr);
61     grpc_server_start(server_);
62     thread_.reset(new std::thread(std::bind(&ServerThread::Serve, this)));
63   }
64 
Shutdown()65   void Shutdown() {
66     grpc_completion_queue* shutdown_cq =
67         grpc_completion_queue_create_for_pluck(nullptr);
68     grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr);
69     GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, nullptr,
70                                            grpc_timeout_seconds_to_deadline(1),
71                                            nullptr)
72                    .type == GRPC_OP_COMPLETE);
73     grpc_completion_queue_destroy(shutdown_cq);
74     grpc_server_destroy(server_);
75     grpc_completion_queue_destroy(cq_);
76     thread_->join();
77   }
78 
79  private:
Serve()80   void Serve() {
81     // The completion queue should not return anything other than shutdown.
82     grpc_event ev = grpc_completion_queue_next(
83         cq_, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
84     ASSERT_EQ(GRPC_QUEUE_SHUTDOWN, ev.type);
85   }
86 
87   const char* address_;  // Do not own.
88   grpc_server* server_ = nullptr;
89   grpc_completion_queue* cq_ = nullptr;
90   std::unique_ptr<std::thread> thread_;
91 };
92 
93 // A TCP client that connects to the server, reads data until the server
94 // closes, and then terminates.
95 class Client {
96  public:
Client(const char * server_address)97   explicit Client(const char* server_address)
98       : server_address_(server_address) {}
99 
Connect()100   void Connect() {
101     grpc_core::ExecCtx exec_ctx;
102     grpc_resolved_addresses* server_addresses = nullptr;
103     grpc_error* error =
104         grpc_blocking_resolve_address(server_address_, "80", &server_addresses);
105     ASSERT_EQ(GRPC_ERROR_NONE, error) << grpc_error_string(error);
106     ASSERT_GE(server_addresses->naddrs, 1UL);
107     pollset_ = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
108     grpc_pollset_init(pollset_, &mu_);
109     grpc_pollset_set* pollset_set = grpc_pollset_set_create();
110     grpc_pollset_set_add_pollset(pollset_set, pollset_);
111     EventState state;
112     grpc_tcp_client_connect(state.closure(), &endpoint_, pollset_set,
113                             nullptr /* channel_args */, server_addresses->addrs,
114                             1000);
115     ASSERT_TRUE(PollUntilDone(
116         &state,
117         grpc_timespec_to_millis_round_up(gpr_inf_future(GPR_CLOCK_MONOTONIC))));
118     ASSERT_EQ(GRPC_ERROR_NONE, state.error());
119     grpc_pollset_set_destroy(pollset_set);
120     grpc_endpoint_add_to_pollset(endpoint_, pollset_);
121     grpc_resolved_addresses_destroy(server_addresses);
122   }
123 
124   // Reads until an error is returned.
125   // Returns true if an error was encountered before the deadline.
ReadUntilError()126   bool ReadUntilError() {
127     grpc_core::ExecCtx exec_ctx;
128     grpc_slice_buffer read_buffer;
129     grpc_slice_buffer_init(&read_buffer);
130     bool retval = true;
131     // Use a deadline of 3 seconds, which is a lot more than we should
132     // need for a 1-second timeout, but this helps avoid flakes.
133     grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 3000;
134     while (true) {
135       EventState state;
136       grpc_endpoint_read(endpoint_, &read_buffer, state.closure());
137       if (!PollUntilDone(&state, deadline)) {
138         retval = false;
139         break;
140       }
141       if (state.error() != GRPC_ERROR_NONE) break;
142       gpr_log(GPR_INFO, "client read %" PRIuPTR " bytes", read_buffer.length);
143       grpc_slice_buffer_reset_and_unref_internal(&read_buffer);
144     }
145     grpc_endpoint_shutdown(endpoint_,
146                            GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown"));
147     grpc_slice_buffer_destroy_internal(&read_buffer);
148     return retval;
149   }
150 
Shutdown()151   void Shutdown() {
152     grpc_core::ExecCtx exec_ctx;
153     grpc_endpoint_destroy(endpoint_);
154     grpc_pollset_shutdown(pollset_,
155                           GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_,
156                                               grpc_schedule_on_exec_ctx));
157   }
158 
159  private:
160   // State used to wait for an I/O event.
161   class EventState {
162    public:
EventState()163     EventState() {
164       GRPC_CLOSURE_INIT(&closure_, &EventState::OnEventDone, this,
165                         grpc_schedule_on_exec_ctx);
166     }
167 
~EventState()168     ~EventState() { GRPC_ERROR_UNREF(error_); }
169 
closure()170     grpc_closure* closure() { return &closure_; }
171 
done() const172     bool done() const { return gpr_atm_acq_load(&done_atm_) != 0; }
173 
174     // Caller does NOT take ownership of the error.
error() const175     grpc_error* error() const { return error_; }
176 
177    private:
OnEventDone(void * arg,grpc_error * error)178     static void OnEventDone(void* arg, grpc_error* error) {
179       gpr_log(GPR_INFO, "OnEventDone(): %s", grpc_error_string(error));
180       EventState* state = static_cast<EventState*>(arg);
181       state->error_ = GRPC_ERROR_REF(error);
182       gpr_atm_rel_store(&state->done_atm_, 1);
183     }
184 
185     grpc_closure closure_;
186     gpr_atm done_atm_ = 0;
187     grpc_error* error_ = GRPC_ERROR_NONE;
188   };
189 
190   // Returns true if done, or false if deadline exceeded.
PollUntilDone(EventState * state,grpc_millis deadline)191   bool PollUntilDone(EventState* state, grpc_millis deadline) {
192     while (true) {
193       grpc_pollset_worker* worker = nullptr;
194       gpr_mu_lock(mu_);
195       GRPC_LOG_IF_ERROR(
196           "grpc_pollset_work",
197           grpc_pollset_work(pollset_, &worker,
198                             grpc_core::ExecCtx::Get()->Now() + 1000));
199       gpr_mu_unlock(mu_);
200       if (state != nullptr && state->done()) return true;
201       if (grpc_core::ExecCtx::Get()->Now() >= deadline) return false;
202     }
203   }
204 
PollsetDestroy(void * arg,grpc_error * error)205   static void PollsetDestroy(void* arg, grpc_error* error) {
206     grpc_pollset* pollset = static_cast<grpc_pollset*>(arg);
207     grpc_pollset_destroy(pollset);
208     gpr_free(pollset);
209   }
210 
211   const char* server_address_;  // Do not own.
212   grpc_endpoint* endpoint_;
213   gpr_mu* mu_;
214   grpc_pollset* pollset_;
215 };
216 
TEST(SettingsTimeout,Basic)217 TEST(SettingsTimeout, Basic) {
218   // Construct server address string.
219   const int server_port = grpc_pick_unused_port_or_die();
220   char* server_address_string;
221   gpr_asprintf(&server_address_string, "localhost:%d", server_port);
222   // Start server.
223   gpr_log(GPR_INFO, "starting server on %s", server_address_string);
224   ServerThread server_thread(server_address_string);
225   server_thread.Start();
226   // Create client and connect to server.
227   gpr_log(GPR_INFO, "starting client connect");
228   Client client(server_address_string);
229   client.Connect();
230   // Client read.  Should fail due to server dropping connection.
231   gpr_log(GPR_INFO, "starting client read");
232   EXPECT_TRUE(client.ReadUntilError());
233   // Shut down client.
234   gpr_log(GPR_INFO, "shutting down client");
235   client.Shutdown();
236   // Shut down server.
237   gpr_log(GPR_INFO, "shutting down server");
238   server_thread.Shutdown();
239   // Clean up.
240   gpr_free(server_address_string);
241 }
242 
243 }  // namespace
244 }  // namespace test
245 }  // namespace grpc_core
246 
main(int argc,char ** argv)247 int main(int argc, char** argv) {
248   ::testing::InitGoogleTest(&argc, argv);
249   grpc_test_init(argc, argv);
250   grpc_init();
251   int result = RUN_ALL_TESTS();
252   grpc_shutdown();
253   return result;
254 }
255