1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/iomgr/combiner.h"
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log.h>
24 
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/gprpp/thd.h"
27 #include "test/core/util/test_config.h"
28 
test_no_op(void)29 static void test_no_op(void) {
30   gpr_log(GPR_DEBUG, "test_no_op");
31   grpc_core::ExecCtx exec_ctx;
32   GRPC_COMBINER_UNREF(grpc_combiner_create(), "test_no_op");
33 }
34 
set_event_to_true(void * value,grpc_error * error)35 static void set_event_to_true(void* value, grpc_error* error) {
36   gpr_event_set(static_cast<gpr_event*>(value), (void*)1);
37 }
38 
test_execute_one(void)39 static void test_execute_one(void) {
40   gpr_log(GPR_DEBUG, "test_execute_one");
41 
42   grpc_combiner* lock = grpc_combiner_create();
43   gpr_event done;
44   gpr_event_init(&done);
45   grpc_core::ExecCtx exec_ctx;
46   GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &done,
47                                          grpc_combiner_scheduler(lock)),
48                      GRPC_ERROR_NONE);
49   grpc_core::ExecCtx::Get()->Flush();
50   GPR_ASSERT(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
51              nullptr);
52   GRPC_COMBINER_UNREF(lock, "test_execute_one");
53 }
54 
55 typedef struct {
56   size_t ctr;
57   grpc_combiner* lock;
58   gpr_event done;
59 } thd_args;
60 
61 typedef struct {
62   size_t* ctr;
63   size_t value;
64 } ex_args;
65 
check_one(void * a,grpc_error * error)66 static void check_one(void* a, grpc_error* error) {
67   ex_args* args = static_cast<ex_args*>(a);
68   GPR_ASSERT(*args->ctr == args->value - 1);
69   *args->ctr = args->value;
70   gpr_free(a);
71 }
72 
execute_many_loop(void * a)73 static void execute_many_loop(void* a) {
74   thd_args* args = static_cast<thd_args*>(a);
75   grpc_core::ExecCtx exec_ctx;
76   size_t n = 1;
77   for (size_t i = 0; i < 10; i++) {
78     for (size_t j = 0; j < 10000; j++) {
79       ex_args* c = static_cast<ex_args*>(gpr_malloc(sizeof(*c)));
80       c->ctr = &args->ctr;
81       c->value = n++;
82       GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(
83                              check_one, c, grpc_combiner_scheduler(args->lock)),
84                          GRPC_ERROR_NONE);
85       grpc_core::ExecCtx::Get()->Flush();
86     }
87     // sleep for a little bit, to test a combiner draining and another thread
88     // picking it up
89     gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
90   }
91   GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &args->done,
92                                          grpc_combiner_scheduler(args->lock)),
93                      GRPC_ERROR_NONE);
94 }
95 
test_execute_many(void)96 static void test_execute_many(void) {
97   gpr_log(GPR_DEBUG, "test_execute_many");
98 
99   grpc_combiner* lock = grpc_combiner_create();
100   grpc_core::Thread thds[100];
101   thd_args ta[GPR_ARRAY_SIZE(thds)];
102   for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
103     ta[i].ctr = 0;
104     ta[i].lock = lock;
105     gpr_event_init(&ta[i].done);
106     thds[i] = grpc_core::Thread("grpc_execute_many", execute_many_loop, &ta[i]);
107     thds[i].Start();
108   }
109   for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
110     GPR_ASSERT(gpr_event_wait(&ta[i].done,
111                               gpr_inf_future(GPR_CLOCK_REALTIME)) != nullptr);
112     thds[i].Join();
113   }
114   grpc_core::ExecCtx exec_ctx;
115   GRPC_COMBINER_UNREF(lock, "test_execute_many");
116 }
117 
118 static gpr_event got_in_finally;
119 
in_finally(void * arg,grpc_error * error)120 static void in_finally(void* arg, grpc_error* error) {
121   gpr_event_set(&got_in_finally, (void*)1);
122 }
123 
add_finally(void * arg,grpc_error * error)124 static void add_finally(void* arg, grpc_error* error) {
125   GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(in_finally, arg,
126                                          grpc_combiner_finally_scheduler(
127                                              static_cast<grpc_combiner*>(arg))),
128                      GRPC_ERROR_NONE);
129 }
130 
test_execute_finally(void)131 static void test_execute_finally(void) {
132   gpr_log(GPR_DEBUG, "test_execute_finally");
133 
134   grpc_combiner* lock = grpc_combiner_create();
135   grpc_core::ExecCtx exec_ctx;
136   gpr_event_init(&got_in_finally);
137   GRPC_CLOSURE_SCHED(
138       GRPC_CLOSURE_CREATE(add_finally, lock, grpc_combiner_scheduler(lock)),
139       GRPC_ERROR_NONE);
140   grpc_core::ExecCtx::Get()->Flush();
141   GPR_ASSERT(gpr_event_wait(&got_in_finally,
142                             grpc_timeout_seconds_to_deadline(5)) != nullptr);
143   GRPC_COMBINER_UNREF(lock, "test_execute_finally");
144 }
145 
main(int argc,char ** argv)146 int main(int argc, char** argv) {
147   grpc_test_init(argc, argv);
148   grpc_init();
149   test_no_op();
150   test_execute_one();
151   test_execute_finally();
152   test_execute_many();
153   grpc_shutdown();
154 
155   return 0;
156 }
157