1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
20 using that endpoint. Because of various transitive includes in uv.h,
21 including windows.h on Windows, uv.h must be included before other system
22 headers. Therefore, sockaddr.h must always be included first */
23 #include "src/core/lib/iomgr/sockaddr.h"
24
25 #include <string.h>
26
27 #include <grpc/grpc.h>
28 #include <grpc/slice.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31
32 #include "src/core/lib/gpr/host_port.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/gprpp/memory.h"
35 #include "src/core/lib/gprpp/thd.h"
36 #include "src/core/lib/iomgr/sockaddr.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 #include "test/core/end2end/cq_verifier.h"
40 #include "test/core/util/port.h"
41 #include "test/core/util/test_config.h"
42 #include "test/core/util/test_tcp_server.h"
43
44 #define HTTP1_RESP \
45 "HTTP/1.0 400 Bad Request\n" \
46 "Content-Type: text/html; charset=UTF-8\n" \
47 "Content-Length: 0\n" \
48 "Date: Tue, 07 Jun 2016 17:43:20 GMT\n\n"
49
50 #define HTTP2_RESP(STATUS_CODE) \
51 "\x00\x00\x00\x04\x00\x00\x00\x00\x00" \
52 "\x00\x00>\x01\x04\x00\x00\x00\x01" \
53 "\x10\x0e" \
54 "content-length\x01" \
55 "0" \
56 "\x10\x0c" \
57 "content-type\x10" \
58 "application/grpc" \
59 "\x10\x07:status\x03" #STATUS_CODE
60
61 #define UNPARSEABLE_RESP "Bad Request\n"
62
63 #define HTTP2_DETAIL_MSG(STATUS_CODE) \
64 "Received http2 header with status: " #STATUS_CODE
65
66 #define HTTP1_DETAIL_MSG "Trying to connect an http1.x server"
67
68 /* TODO(zyc) Check the content of incomming data instead of using this length */
69 /* The 'bad' server will start sending responses after reading this amount of
70 * data from the client. */
71 #define SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD (size_t)200
72
73 struct rpc_state {
74 char* target;
75 grpc_completion_queue* cq;
76 grpc_channel* channel;
77 grpc_call* call;
78 size_t incoming_data_length;
79 grpc_slice_buffer temp_incoming_buffer;
80 grpc_slice_buffer outgoing_buffer;
81 grpc_endpoint* tcp;
82 gpr_atm done_atm;
83 bool write_done;
84 const char* response_payload;
85 size_t response_payload_length;
86 };
87
88 static int server_port;
89 static struct rpc_state state;
90 static grpc_closure on_read;
91 static grpc_closure on_write;
92
tag(intptr_t t)93 static void* tag(intptr_t t) { return (void*)t; }
94
done_write(void * arg,grpc_error * error)95 static void done_write(void* arg, grpc_error* error) {
96 GPR_ASSERT(error == GRPC_ERROR_NONE);
97
98 gpr_atm_rel_store(&state.done_atm, 1);
99 }
100
handle_write()101 static void handle_write() {
102 grpc_slice slice = grpc_slice_from_copied_buffer(
103 state.response_payload, state.response_payload_length);
104
105 grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer);
106 grpc_slice_buffer_add(&state.outgoing_buffer, slice);
107 grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr);
108 }
109
handle_read(void * arg,grpc_error * error)110 static void handle_read(void* arg, grpc_error* error) {
111 GPR_ASSERT(error == GRPC_ERROR_NONE);
112 state.incoming_data_length += state.temp_incoming_buffer.length;
113
114 size_t i;
115 for (i = 0; i < state.temp_incoming_buffer.count; i++) {
116 char* dump = grpc_dump_slice(state.temp_incoming_buffer.slices[i],
117 GPR_DUMP_HEX | GPR_DUMP_ASCII);
118 gpr_log(GPR_DEBUG, "Server received: %s", dump);
119 gpr_free(dump);
120 }
121
122 gpr_log(GPR_DEBUG, "got %" PRIuPTR " bytes, expected %" PRIuPTR " bytes",
123 state.incoming_data_length,
124 SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD);
125 if (state.incoming_data_length >=
126 SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD) {
127 handle_write();
128 } else {
129 grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read);
130 }
131 }
132
on_connect(void * arg,grpc_endpoint * tcp,grpc_pollset * accepting_pollset,grpc_tcp_server_acceptor * acceptor)133 static void on_connect(void* arg, grpc_endpoint* tcp,
134 grpc_pollset* accepting_pollset,
135 grpc_tcp_server_acceptor* acceptor) {
136 gpr_free(acceptor);
137 test_tcp_server* server = static_cast<test_tcp_server*>(arg);
138 GRPC_CLOSURE_INIT(&on_read, handle_read, nullptr, grpc_schedule_on_exec_ctx);
139 GRPC_CLOSURE_INIT(&on_write, done_write, nullptr, grpc_schedule_on_exec_ctx);
140 grpc_slice_buffer_init(&state.temp_incoming_buffer);
141 grpc_slice_buffer_init(&state.outgoing_buffer);
142 state.tcp = tcp;
143 state.incoming_data_length = 0;
144 grpc_endpoint_add_to_pollset(tcp, server->pollset);
145 grpc_endpoint_read(tcp, &state.temp_incoming_buffer, &on_read);
146 }
147
n_sec_deadline(int seconds)148 static gpr_timespec n_sec_deadline(int seconds) {
149 return gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
150 gpr_time_from_seconds(seconds, GPR_TIMESPAN));
151 }
152
start_rpc(int target_port,grpc_status_code expected_status,const char * expected_detail)153 static void start_rpc(int target_port, grpc_status_code expected_status,
154 const char* expected_detail) {
155 grpc_op ops[6];
156 grpc_op* op;
157 grpc_metadata_array initial_metadata_recv;
158 grpc_metadata_array trailing_metadata_recv;
159 grpc_status_code status;
160 grpc_call_error error;
161 cq_verifier* cqv;
162 grpc_slice details;
163
164 state.cq = grpc_completion_queue_create_for_next(nullptr);
165 cqv = cq_verifier_create(state.cq);
166 gpr_join_host_port(&state.target, "127.0.0.1", target_port);
167 state.channel = grpc_insecure_channel_create(state.target, nullptr, nullptr);
168 grpc_slice host = grpc_slice_from_static_string("localhost");
169 state.call = grpc_channel_create_call(
170 state.channel, nullptr, GRPC_PROPAGATE_DEFAULTS, state.cq,
171 grpc_slice_from_static_string("/Service/Method"), &host,
172 gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
173
174 grpc_metadata_array_init(&initial_metadata_recv);
175 grpc_metadata_array_init(&trailing_metadata_recv);
176
177 memset(ops, 0, sizeof(ops));
178 op = ops;
179 op->op = GRPC_OP_SEND_INITIAL_METADATA;
180 op->data.send_initial_metadata.count = 0;
181 op->flags = 0;
182 op->reserved = nullptr;
183 op++;
184 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
185 op->flags = 0;
186 op->reserved = nullptr;
187 op++;
188 op->op = GRPC_OP_RECV_INITIAL_METADATA;
189 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
190 op->flags = 0;
191 op->reserved = nullptr;
192 op++;
193 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
194 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
195 op->data.recv_status_on_client.status = &status;
196 op->data.recv_status_on_client.status_details = &details;
197 op->flags = 0;
198 op->reserved = nullptr;
199 op++;
200 error = grpc_call_start_batch(state.call, ops, static_cast<size_t>(op - ops),
201 tag(1), nullptr);
202
203 GPR_ASSERT(GRPC_CALL_OK == error);
204
205 CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
206 cq_verify(cqv);
207
208 GPR_ASSERT(status == expected_status);
209 if (expected_detail != nullptr) {
210 GPR_ASSERT(-1 != grpc_slice_slice(details, grpc_slice_from_static_string(
211 expected_detail)));
212 }
213
214 grpc_metadata_array_destroy(&initial_metadata_recv);
215 grpc_metadata_array_destroy(&trailing_metadata_recv);
216 grpc_slice_unref(details);
217 cq_verifier_destroy(cqv);
218 }
219
cleanup_rpc()220 static void cleanup_rpc() {
221 grpc_event ev;
222 grpc_slice_buffer_destroy_internal(&state.temp_incoming_buffer);
223 grpc_slice_buffer_destroy_internal(&state.outgoing_buffer);
224 grpc_call_unref(state.call);
225 grpc_completion_queue_shutdown(state.cq);
226 do {
227 ev = grpc_completion_queue_next(state.cq, n_sec_deadline(1), nullptr);
228 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
229 grpc_completion_queue_destroy(state.cq);
230 grpc_channel_destroy(state.channel);
231 gpr_free(state.target);
232 }
233
234 typedef struct {
235 test_tcp_server* server;
236 gpr_event* signal_when_done;
237 } poll_args;
238
actually_poll_server(void * arg)239 static void actually_poll_server(void* arg) {
240 poll_args* pa = static_cast<poll_args*>(arg);
241 gpr_timespec deadline = n_sec_deadline(10);
242 while (true) {
243 bool done = gpr_atm_acq_load(&state.done_atm) != 0;
244 gpr_timespec time_left =
245 gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME));
246 gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64 ".%09d", done,
247 time_left.tv_sec, time_left.tv_nsec);
248 if (done || gpr_time_cmp(time_left, gpr_time_0(GPR_TIMESPAN)) < 0) {
249 break;
250 }
251 test_tcp_server_poll(pa->server, 1);
252 }
253 gpr_event_set(pa->signal_when_done, (void*)1);
254 gpr_free(pa);
255 }
256
poll_server_until_read_done(test_tcp_server * server,gpr_event * signal_when_done)257 static grpc_core::Thread* poll_server_until_read_done(
258 test_tcp_server* server, gpr_event* signal_when_done) {
259 gpr_atm_rel_store(&state.done_atm, 0);
260 state.write_done = 0;
261 poll_args* pa = static_cast<poll_args*>(gpr_malloc(sizeof(*pa)));
262 pa->server = server;
263 pa->signal_when_done = signal_when_done;
264 auto* th = grpc_core::New<grpc_core::Thread>("grpc_poll_server",
265 actually_poll_server, pa);
266 th->Start();
267 return th;
268 }
269
run_test(const char * response_payload,size_t response_payload_length,grpc_status_code expected_status,const char * expected_detail)270 static void run_test(const char* response_payload,
271 size_t response_payload_length,
272 grpc_status_code expected_status,
273 const char* expected_detail) {
274 test_tcp_server test_server;
275 grpc_core::ExecCtx exec_ctx;
276 gpr_event ev;
277
278 grpc_init();
279 gpr_event_init(&ev);
280 server_port = grpc_pick_unused_port_or_die();
281 test_tcp_server_init(&test_server, on_connect, &test_server);
282 test_tcp_server_start(&test_server, server_port);
283 state.response_payload = response_payload;
284 state.response_payload_length = response_payload_length;
285
286 /* poll server until sending out the response */
287 grpc_core::UniquePtr<grpc_core::Thread> thdptr(
288 poll_server_until_read_done(&test_server, &ev));
289 start_rpc(server_port, expected_status, expected_detail);
290 gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
291 thdptr->Join();
292
293 /* clean up */
294 grpc_endpoint_shutdown(state.tcp,
295 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
296 grpc_endpoint_destroy(state.tcp);
297 cleanup_rpc();
298 grpc_core::ExecCtx::Get()->Flush();
299 test_tcp_server_destroy(&test_server);
300
301 grpc_shutdown();
302 }
303
main(int argc,char ** argv)304 int main(int argc, char** argv) {
305 grpc_test_init(argc, argv);
306 grpc_init();
307
308 /* status defined in hpack static table */
309 run_test(HTTP2_RESP(204), sizeof(HTTP2_RESP(204)) - 1, GRPC_STATUS_CANCELLED,
310 HTTP2_DETAIL_MSG(204));
311
312 run_test(HTTP2_RESP(206), sizeof(HTTP2_RESP(206)) - 1, GRPC_STATUS_CANCELLED,
313 HTTP2_DETAIL_MSG(206));
314
315 run_test(HTTP2_RESP(304), sizeof(HTTP2_RESP(304)) - 1, GRPC_STATUS_CANCELLED,
316 HTTP2_DETAIL_MSG(304));
317
318 run_test(HTTP2_RESP(400), sizeof(HTTP2_RESP(400)) - 1, GRPC_STATUS_CANCELLED,
319 HTTP2_DETAIL_MSG(400));
320
321 run_test(HTTP2_RESP(404), sizeof(HTTP2_RESP(404)) - 1, GRPC_STATUS_CANCELLED,
322 HTTP2_DETAIL_MSG(404));
323
324 run_test(HTTP2_RESP(500), sizeof(HTTP2_RESP(500)) - 1, GRPC_STATUS_CANCELLED,
325 HTTP2_DETAIL_MSG(500));
326
327 /* status not defined in hpack static table */
328 run_test(HTTP2_RESP(401), sizeof(HTTP2_RESP(401)) - 1, GRPC_STATUS_CANCELLED,
329 HTTP2_DETAIL_MSG(401));
330
331 run_test(HTTP2_RESP(403), sizeof(HTTP2_RESP(403)) - 1, GRPC_STATUS_CANCELLED,
332 HTTP2_DETAIL_MSG(403));
333
334 run_test(HTTP2_RESP(502), sizeof(HTTP2_RESP(502)) - 1, GRPC_STATUS_CANCELLED,
335 HTTP2_DETAIL_MSG(502));
336
337 /* unparseable response */
338 run_test(UNPARSEABLE_RESP, sizeof(UNPARSEABLE_RESP) - 1, GRPC_STATUS_UNKNOWN,
339 nullptr);
340
341 /* http1 response */
342 run_test(HTTP1_RESP, sizeof(HTTP1_RESP) - 1, GRPC_STATUS_UNAVAILABLE,
343 HTTP1_DETAIL_MSG);
344
345 grpc_shutdown();
346 return 0;
347 }
348