1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <inttypes.h>
22 
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 
26 #include "src/core/lib/debug/trace.h"
27 #include "src/core/lib/gprpp/thd.h"
28 #include "src/core/lib/iomgr/timer.h"
29 #include "src/core/lib/iomgr/timer_manager.h"
30 
31 struct completed_thread {
32   grpc_core::Thread thd;
33   completed_thread* next;
34 };
35 
36 extern grpc_core::TraceFlag grpc_timer_check_trace;
37 
38 // global mutex
39 static gpr_mu g_mu;
40 // are we multi-threaded
41 static bool g_threaded;
42 // cv to wait until a thread is needed
43 static gpr_cv g_cv_wait;
44 // cv for notification when threading ends
45 static gpr_cv g_cv_shutdown;
46 // number of threads in the system
47 static int g_thread_count;
48 // number of threads sitting around waiting
49 static int g_waiter_count;
50 // linked list of threads that have completed (and need joining)
51 static completed_thread* g_completed_threads;
52 // was the manager kicked by the timer system
53 static bool g_kicked;
54 // is there a thread waiting until the next timer should fire?
55 static bool g_has_timed_waiter;
56 // the deadline of the current timed waiter thread (only relevant if
57 // g_has_timed_waiter is true)
58 static grpc_millis g_timed_waiter_deadline;
59 // generation counter to track which thread is waiting for the next timer
60 static uint64_t g_timed_waiter_generation;
61 
62 static void timer_thread(void* completed_thread_ptr);
63 
gc_completed_threads(void)64 static void gc_completed_threads(void) {
65   if (g_completed_threads != nullptr) {
66     completed_thread* to_gc = g_completed_threads;
67     g_completed_threads = nullptr;
68     gpr_mu_unlock(&g_mu);
69     while (to_gc != nullptr) {
70       to_gc->thd.Join();
71       completed_thread* next = to_gc->next;
72       gpr_free(to_gc);
73       to_gc = next;
74     }
75     gpr_mu_lock(&g_mu);
76   }
77 }
78 
start_timer_thread_and_unlock(void)79 static void start_timer_thread_and_unlock(void) {
80   GPR_ASSERT(g_threaded);
81   ++g_waiter_count;
82   ++g_thread_count;
83   gpr_mu_unlock(&g_mu);
84   if (grpc_timer_check_trace.enabled()) {
85     gpr_log(GPR_INFO, "Spawn timer thread");
86   }
87   completed_thread* ct =
88       static_cast<completed_thread*>(gpr_malloc(sizeof(*ct)));
89   ct->thd = grpc_core::Thread("grpc_global_timer", timer_thread, ct);
90   ct->thd.Start();
91 }
92 
grpc_timer_manager_tick()93 void grpc_timer_manager_tick() {
94   grpc_core::ExecCtx exec_ctx;
95   grpc_millis next = GRPC_MILLIS_INF_FUTURE;
96   grpc_timer_check(&next);
97 }
98 
run_some_timers()99 static void run_some_timers() {
100   // if there's something to execute...
101   gpr_mu_lock(&g_mu);
102   // remove a waiter from the pool, and start another thread if necessary
103   --g_waiter_count;
104   if (g_waiter_count == 0 && g_threaded) {
105     start_timer_thread_and_unlock();
106   } else {
107     // if there's no thread waiting with a timeout, kick an existing
108     // waiter so that the next deadline is not missed
109     if (!g_has_timed_waiter) {
110       if (grpc_timer_check_trace.enabled()) {
111         gpr_log(GPR_INFO, "kick untimed waiter");
112       }
113       gpr_cv_signal(&g_cv_wait);
114     }
115     gpr_mu_unlock(&g_mu);
116   }
117   // without our lock, flush the exec_ctx
118   if (grpc_timer_check_trace.enabled()) {
119     gpr_log(GPR_INFO, "flush exec_ctx");
120   }
121   grpc_core::ExecCtx::Get()->Flush();
122   gpr_mu_lock(&g_mu);
123   // garbage collect any threads hanging out that are dead
124   gc_completed_threads();
125   // get ready to wait again
126   ++g_waiter_count;
127   gpr_mu_unlock(&g_mu);
128 }
129 
130 // wait until 'next' (or forever if there is already a timed waiter in the pool)
131 // returns true if the thread should continue executing (false if it should
132 // shutdown)
wait_until(grpc_millis next)133 static bool wait_until(grpc_millis next) {
134   gpr_mu_lock(&g_mu);
135   // if we're not threaded anymore, leave
136   if (!g_threaded) {
137     gpr_mu_unlock(&g_mu);
138     return false;
139   }
140 
141   // If g_kicked is true at this point, it means there was a kick from the timer
142   // system that the timer-manager threads here missed. We cannot trust 'next'
143   // here any longer (since there might be an earlier deadline). So if g_kicked
144   // is true at this point, we should quickly exit this and get the next
145   // deadline from the timer system
146 
147   if (!g_kicked) {
148     // if there's no timed waiter, we should become one: that waiter waits
149     // only until the next timer should expire. All other timers wait forever
150     //
151     // 'g_timed_waiter_generation' is a global generation counter. The idea here
152     // is that the thread becoming a timed-waiter increments and stores this
153     // global counter locally in 'my_timed_waiter_generation' before going to
154     // sleep. After waking up, if my_timed_waiter_generation ==
155     // g_timed_waiter_generation, it can be sure that it was the timed_waiter
156     // thread (and that no other thread took over while this was asleep)
157     //
158     // Initialize my_timed_waiter_generation to some value that is NOT equal to
159     // g_timed_waiter_generation
160     uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1;
161 
162     /* If there's no timed waiter, we should become one: that waiter waits only
163        until the next timer should expire. All other timer threads wait forever
164        unless their 'next' is earlier than the current timed-waiter's deadline
165        (in which case the thread with earlier 'next' takes over as the new timed
166        waiter) */
167     if (next != GRPC_MILLIS_INF_FUTURE) {
168       if (!g_has_timed_waiter || (next < g_timed_waiter_deadline)) {
169         my_timed_waiter_generation = ++g_timed_waiter_generation;
170         g_has_timed_waiter = true;
171         g_timed_waiter_deadline = next;
172 
173         if (grpc_timer_check_trace.enabled()) {
174           grpc_millis wait_time = next - grpc_core::ExecCtx::Get()->Now();
175           gpr_log(GPR_INFO, "sleep for a %" PRId64 " milliseconds", wait_time);
176         }
177       } else {  // g_timed_waiter == true && next >= g_timed_waiter_deadline
178         next = GRPC_MILLIS_INF_FUTURE;
179       }
180     }
181 
182     if (grpc_timer_check_trace.enabled() && next == GRPC_MILLIS_INF_FUTURE) {
183       gpr_log(GPR_INFO, "sleep until kicked");
184     }
185 
186     gpr_cv_wait(&g_cv_wait, &g_mu,
187                 grpc_millis_to_timespec(next, GPR_CLOCK_MONOTONIC));
188 
189     if (grpc_timer_check_trace.enabled()) {
190       gpr_log(GPR_INFO, "wait ended: was_timed:%d kicked:%d",
191               my_timed_waiter_generation == g_timed_waiter_generation,
192               g_kicked);
193     }
194     // if this was the timed waiter, then we need to check timers, and flag
195     // that there's now no timed waiter... we'll look for a replacement if
196     // there's work to do after checking timers (code above)
197     if (my_timed_waiter_generation == g_timed_waiter_generation) {
198       g_has_timed_waiter = false;
199       g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
200     }
201   }
202 
203   // if this was a kick from the timer system, consume it (and don't stop
204   // this thread yet)
205   if (g_kicked) {
206     grpc_timer_consume_kick();
207     g_kicked = false;
208   }
209 
210   gpr_mu_unlock(&g_mu);
211   return true;
212 }
213 
timer_main_loop()214 static void timer_main_loop() {
215   for (;;) {
216     grpc_millis next = GRPC_MILLIS_INF_FUTURE;
217     grpc_core::ExecCtx::Get()->InvalidateNow();
218 
219     // check timer state, updates next to the next time to run a check
220     switch (grpc_timer_check(&next)) {
221       case GRPC_TIMERS_FIRED:
222         run_some_timers();
223         break;
224       case GRPC_TIMERS_NOT_CHECKED:
225         /* This case only happens under contention, meaning more than one timer
226            manager thread checked timers concurrently.
227 
228            If that happens, we're guaranteed that some other thread has just
229            checked timers, and this will avalanche into some other thread seeing
230            empty timers and doing a timed sleep.
231 
232            Consequently, we can just sleep forever here and be happy at some
233            saved wakeup cycles. */
234         if (grpc_timer_check_trace.enabled()) {
235           gpr_log(GPR_INFO, "timers not checked: expect another thread to");
236         }
237         next = GRPC_MILLIS_INF_FUTURE;
238       /* fall through */
239       case GRPC_TIMERS_CHECKED_AND_EMPTY:
240         if (!wait_until(next)) {
241           return;
242         }
243         break;
244     }
245   }
246 }
247 
timer_thread_cleanup(completed_thread * ct)248 static void timer_thread_cleanup(completed_thread* ct) {
249   gpr_mu_lock(&g_mu);
250   // terminate the thread: drop the waiter count, thread count, and let whomever
251   // stopped the threading stuff know that we're done
252   --g_waiter_count;
253   --g_thread_count;
254   if (0 == g_thread_count) {
255     gpr_cv_signal(&g_cv_shutdown);
256   }
257   ct->next = g_completed_threads;
258   g_completed_threads = ct;
259   gpr_mu_unlock(&g_mu);
260   if (grpc_timer_check_trace.enabled()) {
261     gpr_log(GPR_INFO, "End timer thread");
262   }
263 }
264 
timer_thread(void * completed_thread_ptr)265 static void timer_thread(void* completed_thread_ptr) {
266   // this threads exec_ctx: we try to run things through to completion here
267   // since it's easy to spin up new threads
268   grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
269   timer_main_loop();
270 
271   timer_thread_cleanup(static_cast<completed_thread*>(completed_thread_ptr));
272 }
273 
start_threads(void)274 static void start_threads(void) {
275   gpr_mu_lock(&g_mu);
276   if (!g_threaded) {
277     g_threaded = true;
278     start_timer_thread_and_unlock();
279   } else {
280     gpr_mu_unlock(&g_mu);
281   }
282 }
283 
grpc_timer_manager_init(void)284 void grpc_timer_manager_init(void) {
285   gpr_mu_init(&g_mu);
286   gpr_cv_init(&g_cv_wait);
287   gpr_cv_init(&g_cv_shutdown);
288   g_threaded = false;
289   g_thread_count = 0;
290   g_waiter_count = 0;
291   g_completed_threads = nullptr;
292 
293   g_has_timed_waiter = false;
294   g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
295 
296   start_threads();
297 }
298 
stop_threads(void)299 static void stop_threads(void) {
300   gpr_mu_lock(&g_mu);
301   if (grpc_timer_check_trace.enabled()) {
302     gpr_log(GPR_INFO, "stop timer threads: threaded=%d", g_threaded);
303   }
304   if (g_threaded) {
305     g_threaded = false;
306     gpr_cv_broadcast(&g_cv_wait);
307     if (grpc_timer_check_trace.enabled()) {
308       gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
309     }
310     while (g_thread_count > 0) {
311       gpr_cv_wait(&g_cv_shutdown, &g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
312       if (grpc_timer_check_trace.enabled()) {
313         gpr_log(GPR_INFO, "num timer threads: %d", g_thread_count);
314       }
315       gc_completed_threads();
316     }
317   }
318   gpr_mu_unlock(&g_mu);
319 }
320 
grpc_timer_manager_shutdown(void)321 void grpc_timer_manager_shutdown(void) {
322   stop_threads();
323 
324   gpr_mu_destroy(&g_mu);
325   gpr_cv_destroy(&g_cv_wait);
326   gpr_cv_destroy(&g_cv_shutdown);
327 }
328 
grpc_timer_manager_set_threading(bool threaded)329 void grpc_timer_manager_set_threading(bool threaded) {
330   if (threaded) {
331     start_threads();
332   } else {
333     stop_threads();
334   }
335 }
336 
grpc_kick_poller(void)337 void grpc_kick_poller(void) {
338   gpr_mu_lock(&g_mu);
339   g_kicked = true;
340   g_has_timed_waiter = false;
341   g_timed_waiter_deadline = GRPC_MILLIS_INF_FUTURE;
342   ++g_timed_waiter_generation;
343   gpr_cv_signal(&g_cv_wait);
344   gpr_mu_unlock(&g_mu);
345 }
346