1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/gprpp/fork.h"
22 
23 #include <string.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/sync.h>
27 #include <grpc/support/time.h>
28 
29 #include "src/core/lib/gpr/env.h"
30 #include "src/core/lib/gpr/useful.h"
31 #include "src/core/lib/gprpp/memory.h"
32 
33 /*
34  * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
35  *       AROUND VERY SPECIFIC USE CASES.
36  */
37 
38 namespace grpc_core {
39 namespace internal {
40 // The exec_ctx_count has 2 modes, blocked and unblocked.
41 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
42 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
43 
44 // When blocked, the exec_ctx_count is 0-indexed.  Note that ExecCtx
45 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
46 // meaning that BLOCKED and UNBLOCKED counts partition the integers
47 #define UNBLOCKED(n) (n + 2)
48 #define BLOCKED(n) (n)
49 
50 class ExecCtxState {
51  public:
ExecCtxState()52   ExecCtxState() : fork_complete_(true) {
53     gpr_mu_init(&mu_);
54     gpr_cv_init(&cv_);
55     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
56   }
57 
IncExecCtxCount()58   void IncExecCtxCount() {
59     gpr_atm count = gpr_atm_no_barrier_load(&count_);
60     while (true) {
61       if (count <= BLOCKED(1)) {
62         // This only occurs if we are trying to fork.  Wait until the fork()
63         // operation completes before allowing new ExecCtxs.
64         gpr_mu_lock(&mu_);
65         if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
66           while (!fork_complete_) {
67             gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
68           }
69         }
70         gpr_mu_unlock(&mu_);
71       } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
72         break;
73       }
74       count = gpr_atm_no_barrier_load(&count_);
75     }
76   }
77 
DecExecCtxCount()78   void DecExecCtxCount() { gpr_atm_no_barrier_fetch_add(&count_, -1); }
79 
BlockExecCtx()80   bool BlockExecCtx() {
81     // Assumes there is an active ExecCtx when this function is called
82     if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
83       gpr_mu_lock(&mu_);
84       fork_complete_ = false;
85       gpr_mu_unlock(&mu_);
86       return true;
87     }
88     return false;
89   }
90 
AllowExecCtx()91   void AllowExecCtx() {
92     gpr_mu_lock(&mu_);
93     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
94     fork_complete_ = true;
95     gpr_cv_broadcast(&cv_);
96     gpr_mu_unlock(&mu_);
97   }
98 
~ExecCtxState()99   ~ExecCtxState() {
100     gpr_mu_destroy(&mu_);
101     gpr_cv_destroy(&cv_);
102   }
103 
104  private:
105   bool fork_complete_;
106   gpr_mu mu_;
107   gpr_cv cv_;
108   gpr_atm count_;
109 };
110 
111 class ThreadState {
112  public:
ThreadState()113   ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
114     gpr_mu_init(&mu_);
115     gpr_cv_init(&cv_);
116   }
117 
IncThreadCount()118   void IncThreadCount() {
119     gpr_mu_lock(&mu_);
120     count_++;
121     gpr_mu_unlock(&mu_);
122   }
123 
DecThreadCount()124   void DecThreadCount() {
125     gpr_mu_lock(&mu_);
126     count_--;
127     if (awaiting_threads_ && count_ == 0) {
128       threads_done_ = true;
129       gpr_cv_signal(&cv_);
130     }
131     gpr_mu_unlock(&mu_);
132   }
AwaitThreads()133   void AwaitThreads() {
134     gpr_mu_lock(&mu_);
135     awaiting_threads_ = true;
136     threads_done_ = (count_ == 0);
137     while (!threads_done_) {
138       gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
139     }
140     awaiting_threads_ = true;
141     gpr_mu_unlock(&mu_);
142   }
143 
~ThreadState()144   ~ThreadState() {
145     gpr_mu_destroy(&mu_);
146     gpr_cv_destroy(&cv_);
147   }
148 
149  private:
150   bool awaiting_threads_;
151   bool threads_done_;
152   gpr_mu mu_;
153   gpr_cv cv_;
154   int count_;
155 };
156 
157 }  // namespace
158 
GlobalInit()159 void Fork::GlobalInit() {
160   if (!override_enabled_) {
161 #ifdef GRPC_ENABLE_FORK_SUPPORT
162     support_enabled_ = true;
163 #else
164     support_enabled_ = false;
165 #endif
166     bool env_var_set = false;
167     char* env = gpr_getenv("GRPC_ENABLE_FORK_SUPPORT");
168     if (env != nullptr) {
169       static const char* truthy[] = {"yes",  "Yes",  "YES", "true",
170                                      "True", "TRUE", "1"};
171       static const char* falsey[] = {"no",    "No",    "NO", "false",
172                                      "False", "FALSE", "0"};
173       for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
174         if (0 == strcmp(env, truthy[i])) {
175           support_enabled_ = true;
176           env_var_set = true;
177           break;
178         }
179       }
180       if (!env_var_set) {
181         for (size_t i = 0; i < GPR_ARRAY_SIZE(falsey); i++) {
182           if (0 == strcmp(env, falsey[i])) {
183             support_enabled_ = false;
184             env_var_set = true;
185             break;
186           }
187         }
188       }
189       gpr_free(env);
190     }
191   }
192   if (support_enabled_) {
193     exec_ctx_state_ = grpc_core::New<internal::ExecCtxState>();
194     thread_state_ = grpc_core::New<internal::ThreadState>();
195   }
196 }
197 
GlobalShutdown()198 void Fork::GlobalShutdown() {
199   if (support_enabled_) {
200     grpc_core::Delete(exec_ctx_state_);
201     grpc_core::Delete(thread_state_);
202   }
203 }
204 
Enabled()205 bool Fork::Enabled() { return support_enabled_; }
206 
207 // Testing Only
Enable(bool enable)208 void Fork::Enable(bool enable) {
209   override_enabled_ = true;
210   support_enabled_ = enable;
211 }
212 
IncExecCtxCount()213 void Fork::IncExecCtxCount() {
214   if (support_enabled_) {
215     exec_ctx_state_->IncExecCtxCount();
216   }
217 }
218 
DecExecCtxCount()219 void Fork::DecExecCtxCount() {
220   if (support_enabled_) {
221     exec_ctx_state_->DecExecCtxCount();
222   }
223 }
224 
SetResetChildPollingEngineFunc(Fork::child_postfork_func reset_child_polling_engine)225 void Fork::SetResetChildPollingEngineFunc(
226     Fork::child_postfork_func reset_child_polling_engine) {
227   reset_child_polling_engine_ = reset_child_polling_engine;
228 }
GetResetChildPollingEngineFunc()229 Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
230   return reset_child_polling_engine_;
231 }
232 
BlockExecCtx()233 bool Fork::BlockExecCtx() {
234   if (support_enabled_) {
235     return exec_ctx_state_->BlockExecCtx();
236   }
237   return false;
238 }
239 
AllowExecCtx()240 void Fork::AllowExecCtx() {
241   if (support_enabled_) {
242     exec_ctx_state_->AllowExecCtx();
243   }
244 }
245 
IncThreadCount()246 void Fork::IncThreadCount() {
247   if (support_enabled_) {
248     thread_state_->IncThreadCount();
249   }
250 }
251 
DecThreadCount()252 void Fork::DecThreadCount() {
253   if (support_enabled_) {
254     thread_state_->DecThreadCount();
255   }
256 }
AwaitThreads()257 void Fork::AwaitThreads() {
258   if (support_enabled_) {
259     thread_state_->AwaitThreads();
260   }
261 }
262 
263 internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr;
264 internal::ThreadState* Fork::thread_state_ = nullptr;
265 bool Fork::support_enabled_ = false;
266 bool Fork::override_enabled_ = false;
267 Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
268 }  // namespace grpc_core
269