1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/grpc.h>
20 #include <grpc/grpc_security.h>
21
22 #include <signal.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <time.h>
27 #ifndef _WIN32
28 /* This is for _exit() below, which is temporary. */
29 #include <unistd.h>
30 #endif
31
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 #include <grpc/support/time.h>
35
36 #include "src/core/lib/gpr/host_port.h"
37 #include "test/core/end2end/data/ssl_test_data.h"
38 #include "test/core/util/cmdline.h"
39 #include "test/core/util/memory_counters.h"
40 #include "test/core/util/port.h"
41 #include "test/core/util/test_config.h"
42
43 static grpc_completion_queue* cq;
44 static grpc_server* server;
45 static grpc_op metadata_ops[2];
46 static grpc_op snapshot_ops[5];
47 static grpc_op status_op;
48 static int got_sigint = 0;
49 static grpc_byte_buffer* payload_buffer = nullptr;
50 static grpc_byte_buffer* terminal_buffer = nullptr;
51 static int was_cancelled = 2;
52
tag(intptr_t t)53 static void* tag(intptr_t t) { return (void*)t; }
54
55 typedef enum {
56 FLING_SERVER_NEW_REQUEST = 1,
57 FLING_SERVER_SEND_INIT_METADATA,
58 FLING_SERVER_WAIT_FOR_DESTROY,
59 FLING_SERVER_SEND_STATUS_FLING_CALL,
60 FLING_SERVER_SEND_STATUS_SNAPSHOT,
61 FLING_SERVER_BATCH_SEND_STATUS_FLING_CALL
62 } fling_server_tags;
63
64 typedef struct {
65 fling_server_tags state;
66 grpc_call* call;
67 grpc_call_details call_details;
68 grpc_metadata_array request_metadata_recv;
69 grpc_metadata_array initial_metadata_send;
70 } fling_call;
71
72 // hold up to 10000 calls and 6 snaphost calls
73 static fling_call calls[100006];
74
request_call_unary(int call_idx)75 static void request_call_unary(int call_idx) {
76 if (call_idx == static_cast<int>(sizeof(calls) / sizeof(fling_call))) {
77 gpr_log(GPR_INFO, "Used all call slots (10000) on server. Server exit.");
78 _exit(0);
79 }
80 grpc_metadata_array_init(&calls[call_idx].request_metadata_recv);
81 grpc_server_request_call(
82 server, &calls[call_idx].call, &calls[call_idx].call_details,
83 &calls[call_idx].request_metadata_recv, cq, cq, &calls[call_idx]);
84 }
85
send_initial_metadata_unary(void * tag)86 static void send_initial_metadata_unary(void* tag) {
87 grpc_metadata_array_init(
88 &(*static_cast<fling_call*>(tag)).initial_metadata_send);
89 metadata_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
90 metadata_ops[0].data.send_initial_metadata.count = 0;
91
92 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch((*(fling_call*)tag).call,
93 metadata_ops, 1, tag,
94 nullptr));
95 }
96
send_status(void * tag)97 static void send_status(void* tag) {
98 status_op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
99 status_op.data.send_status_from_server.status = GRPC_STATUS_OK;
100 status_op.data.send_status_from_server.trailing_metadata_count = 0;
101 grpc_slice details = grpc_slice_from_static_string("");
102 status_op.data.send_status_from_server.status_details = &details;
103
104 GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch((*(fling_call*)tag).call,
105 &status_op, 1, tag,
106 nullptr));
107 }
108
send_snapshot(void * tag,struct grpc_memory_counters * snapshot)109 static void send_snapshot(void* tag, struct grpc_memory_counters* snapshot) {
110 grpc_op* op;
111
112 grpc_slice snapshot_slice =
113 grpc_slice_new(snapshot, sizeof(*snapshot), gpr_free);
114 payload_buffer = grpc_raw_byte_buffer_create(&snapshot_slice, 1);
115 grpc_metadata_array_init(
116 &(*static_cast<fling_call*>(tag)).initial_metadata_send);
117
118 op = snapshot_ops;
119 op->op = GRPC_OP_SEND_INITIAL_METADATA;
120 op->data.send_initial_metadata.count = 0;
121 op++;
122 op->op = GRPC_OP_RECV_MESSAGE;
123 op->data.recv_message.recv_message = &terminal_buffer;
124 op++;
125 op->op = GRPC_OP_SEND_MESSAGE;
126 if (payload_buffer == nullptr) {
127 gpr_log(GPR_INFO, "NULL payload buffer !!!");
128 }
129 op->data.send_message.send_message = payload_buffer;
130 op++;
131 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
132 op->data.send_status_from_server.status = GRPC_STATUS_OK;
133 op->data.send_status_from_server.trailing_metadata_count = 0;
134 grpc_slice details = grpc_slice_from_static_string("");
135 op->data.send_status_from_server.status_details = &details;
136 op++;
137 op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
138 op->data.recv_close_on_server.cancelled = &was_cancelled;
139 op++;
140
141 GPR_ASSERT(GRPC_CALL_OK ==
142 grpc_call_start_batch((*(fling_call*)tag).call, snapshot_ops,
143 (size_t)(op - snapshot_ops), tag, nullptr));
144 }
145 /* We have some sort of deadlock, so let's not exit gracefully for now.
146 When that is resolved, please remove the #include <unistd.h> above. */
sigint_handler(int x)147 static void sigint_handler(int x) { _exit(0); }
148
main(int argc,char ** argv)149 int main(int argc, char** argv) {
150 grpc_memory_counters_init();
151 grpc_event ev;
152 char* addr_buf = nullptr;
153 gpr_cmdline* cl;
154 grpc_completion_queue* shutdown_cq;
155 int shutdown_started = 0;
156 int shutdown_finished = 0;
157
158 int secure = 0;
159 const char* addr = nullptr;
160
161 char* fake_argv[1];
162
163 GPR_ASSERT(argc >= 1);
164 fake_argv[0] = argv[0];
165 grpc_test_init(1, fake_argv);
166
167 grpc_init();
168 srand(static_cast<unsigned>(clock()));
169
170 cl = gpr_cmdline_create("fling server");
171 gpr_cmdline_add_string(cl, "bind", "Bind host:port", &addr);
172 gpr_cmdline_add_flag(cl, "secure", "Run with security?", &secure);
173 gpr_cmdline_parse(cl, argc, argv);
174 gpr_cmdline_destroy(cl);
175
176 if (addr == nullptr) {
177 gpr_join_host_port(&addr_buf, "::", grpc_pick_unused_port_or_die());
178 addr = addr_buf;
179 }
180 gpr_log(GPR_INFO, "creating server on: %s", addr);
181
182 cq = grpc_completion_queue_create_for_next(nullptr);
183
184 struct grpc_memory_counters before_server_create =
185 grpc_memory_counters_snapshot();
186 if (secure) {
187 grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {test_server1_key,
188 test_server1_cert};
189 grpc_server_credentials* ssl_creds = grpc_ssl_server_credentials_create(
190 nullptr, &pem_key_cert_pair, 1, 0, nullptr);
191 server = grpc_server_create(nullptr, nullptr);
192 GPR_ASSERT(grpc_server_add_secure_http2_port(server, addr, ssl_creds));
193 grpc_server_credentials_release(ssl_creds);
194 } else {
195 server = grpc_server_create(nullptr, nullptr);
196 GPR_ASSERT(grpc_server_add_insecure_http2_port(server, addr));
197 }
198
199 grpc_server_register_completion_queue(server, cq, nullptr);
200 grpc_server_start(server);
201
202 struct grpc_memory_counters after_server_create =
203 grpc_memory_counters_snapshot();
204
205 gpr_free(addr_buf);
206 addr = addr_buf = nullptr;
207
208 // initialize call instances
209 for (int i = 0; i < static_cast<int>(sizeof(calls) / sizeof(fling_call));
210 i++) {
211 grpc_call_details_init(&calls[i].call_details);
212 calls[i].state = FLING_SERVER_NEW_REQUEST;
213 }
214
215 int next_call_idx = 0;
216 struct grpc_memory_counters current_snapshot;
217
218 request_call_unary(next_call_idx);
219
220 signal(SIGINT, sigint_handler);
221
222 while (!shutdown_finished) {
223 if (got_sigint && !shutdown_started) {
224 gpr_log(GPR_INFO, "Shutting down due to SIGINT");
225
226 shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr);
227 grpc_server_shutdown_and_notify(server, shutdown_cq, tag(1000));
228 GPR_ASSERT(grpc_completion_queue_pluck(
229 shutdown_cq, tag(1000),
230 grpc_timeout_seconds_to_deadline(5), nullptr)
231 .type == GRPC_OP_COMPLETE);
232 grpc_completion_queue_destroy(shutdown_cq);
233 grpc_completion_queue_shutdown(cq);
234 shutdown_started = 1;
235 }
236 ev = grpc_completion_queue_next(
237 cq,
238 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
239 gpr_time_from_micros(1000000, GPR_TIMESPAN)),
240 nullptr);
241 fling_call* s = static_cast<fling_call*>(ev.tag);
242 switch (ev.type) {
243 case GRPC_OP_COMPLETE:
244 switch (s->state) {
245 case FLING_SERVER_NEW_REQUEST:
246 request_call_unary(++next_call_idx);
247 if (0 == grpc_slice_str_cmp(s->call_details.method,
248 "/Reflector/reflectUnary")) {
249 s->state = FLING_SERVER_SEND_INIT_METADATA;
250 send_initial_metadata_unary(s);
251 } else if (0 ==
252 grpc_slice_str_cmp(s->call_details.method,
253 "Reflector/GetBeforeSvrCreation")) {
254 s->state = FLING_SERVER_SEND_STATUS_SNAPSHOT;
255 send_snapshot(s, &before_server_create);
256 } else if (0 ==
257 grpc_slice_str_cmp(s->call_details.method,
258 "Reflector/GetAfterSvrCreation")) {
259 s->state = FLING_SERVER_SEND_STATUS_SNAPSHOT;
260 send_snapshot(s, &after_server_create);
261 } else if (0 == grpc_slice_str_cmp(s->call_details.method,
262 "Reflector/SimpleSnapshot")) {
263 s->state = FLING_SERVER_SEND_STATUS_SNAPSHOT;
264 current_snapshot = grpc_memory_counters_snapshot();
265 send_snapshot(s, ¤t_snapshot);
266 } else if (0 == grpc_slice_str_cmp(s->call_details.method,
267 "Reflector/DestroyCalls")) {
268 s->state = FLING_SERVER_BATCH_SEND_STATUS_FLING_CALL;
269 current_snapshot = grpc_memory_counters_snapshot();
270 send_snapshot(s, ¤t_snapshot);
271 } else {
272 gpr_log(GPR_ERROR, "Wrong call method");
273 }
274 break;
275 case FLING_SERVER_SEND_INIT_METADATA:
276 s->state = FLING_SERVER_WAIT_FOR_DESTROY;
277 break;
278 case FLING_SERVER_WAIT_FOR_DESTROY:
279 break;
280 case FLING_SERVER_SEND_STATUS_FLING_CALL:
281 grpc_call_unref(s->call);
282 grpc_call_details_destroy(&s->call_details);
283 grpc_metadata_array_destroy(&s->initial_metadata_send);
284 grpc_metadata_array_destroy(&s->request_metadata_recv);
285 break;
286 case FLING_SERVER_BATCH_SEND_STATUS_FLING_CALL:
287 for (int k = 0;
288 k < static_cast<int>(sizeof(calls) / sizeof(fling_call));
289 ++k) {
290 if (calls[k].state == FLING_SERVER_WAIT_FOR_DESTROY) {
291 calls[k].state = FLING_SERVER_SEND_STATUS_FLING_CALL;
292 send_status(&calls[k]);
293 }
294 }
295 // no break here since we want to continue to case
296 // FLING_SERVER_SEND_STATUS_SNAPSHOT to destroy the snapshot call
297 /* fallthrough */
298 case FLING_SERVER_SEND_STATUS_SNAPSHOT:
299 grpc_byte_buffer_destroy(payload_buffer);
300 grpc_byte_buffer_destroy(terminal_buffer);
301 grpc_call_unref(s->call);
302 grpc_call_details_destroy(&s->call_details);
303 grpc_metadata_array_destroy(&s->initial_metadata_send);
304 grpc_metadata_array_destroy(&s->request_metadata_recv);
305 terminal_buffer = nullptr;
306 payload_buffer = nullptr;
307 break;
308 }
309 break;
310 case GRPC_QUEUE_SHUTDOWN:
311 GPR_ASSERT(shutdown_started);
312 shutdown_finished = 1;
313 break;
314 case GRPC_QUEUE_TIMEOUT:
315 break;
316 }
317 }
318
319 grpc_server_destroy(server);
320 grpc_completion_queue_destroy(cq);
321 grpc_shutdown();
322 grpc_memory_counters_destroy();
323 return 0;
324 }
325