1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/call_combiner.h"
22 
23 #include <inttypes.h>
24 
25 #include <grpc/support/log.h>
26 #include "src/core/lib/debug/stats.h"
27 #include "src/core/lib/profiling/timers.h"
28 
29 grpc_core::TraceFlag grpc_call_combiner_trace(false, "call_combiner");
30 
decode_cancel_state_error(gpr_atm cancel_state)31 static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
32   if (cancel_state & 1) {
33     return (grpc_error*)(cancel_state & ~static_cast<gpr_atm>(1));
34   }
35   return GRPC_ERROR_NONE;
36 }
37 
encode_cancel_state_error(grpc_error * error)38 static gpr_atm encode_cancel_state_error(grpc_error* error) {
39   return static_cast<gpr_atm>(1) | (gpr_atm)error;
40 }
41 
grpc_call_combiner_init(grpc_call_combiner * call_combiner)42 void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
43   gpr_mpscq_init(&call_combiner->queue);
44 }
45 
grpc_call_combiner_destroy(grpc_call_combiner * call_combiner)46 void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
47   gpr_mpscq_destroy(&call_combiner->queue);
48   GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
49 }
50 
51 #ifndef NDEBUG
52 #define DEBUG_ARGS , const char *file, int line
53 #define DEBUG_FMT_STR "%s:%d: "
54 #define DEBUG_FMT_ARGS , file, line
55 #else
56 #define DEBUG_ARGS
57 #define DEBUG_FMT_STR
58 #define DEBUG_FMT_ARGS
59 #endif
60 
grpc_call_combiner_start(grpc_call_combiner * call_combiner,grpc_closure * closure,grpc_error * error DEBUG_ARGS,const char * reason)61 void grpc_call_combiner_start(grpc_call_combiner* call_combiner,
62                               grpc_closure* closure,
63                               grpc_error* error DEBUG_ARGS,
64                               const char* reason) {
65   GPR_TIMER_SCOPE("call_combiner_start", 0);
66   if (grpc_call_combiner_trace.enabled()) {
67     gpr_log(GPR_INFO,
68             "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
69             "%s] error=%s",
70             call_combiner, closure DEBUG_FMT_ARGS, reason,
71             grpc_error_string(error));
72   }
73   size_t prev_size = static_cast<size_t>(
74       gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1));
75   if (grpc_call_combiner_trace.enabled()) {
76     gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
77             prev_size + 1);
78   }
79   GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS();
80   if (prev_size == 0) {
81     GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED();
82 
83     GPR_TIMER_MARK("call_combiner_initiate", 0);
84     if (grpc_call_combiner_trace.enabled()) {
85       gpr_log(GPR_INFO, "  EXECUTING IMMEDIATELY");
86     }
87     // Queue was empty, so execute this closure immediately.
88     GRPC_CLOSURE_SCHED(closure, error);
89   } else {
90     if (grpc_call_combiner_trace.enabled()) {
91       gpr_log(GPR_INFO, "  QUEUING");
92     }
93     // Queue was not empty, so add closure to queue.
94     closure->error_data.error = error;
95     gpr_mpscq_push(&call_combiner->queue,
96                    reinterpret_cast<gpr_mpscq_node*>(closure));
97   }
98 }
99 
grpc_call_combiner_stop(grpc_call_combiner * call_combiner DEBUG_ARGS,const char * reason)100 void grpc_call_combiner_stop(grpc_call_combiner* call_combiner DEBUG_ARGS,
101                              const char* reason) {
102   GPR_TIMER_SCOPE("call_combiner_stop", 0);
103   if (grpc_call_combiner_trace.enabled()) {
104     gpr_log(GPR_INFO,
105             "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
106             call_combiner DEBUG_FMT_ARGS, reason);
107   }
108   size_t prev_size = static_cast<size_t>(
109       gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1));
110   if (grpc_call_combiner_trace.enabled()) {
111     gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
112             prev_size - 1);
113   }
114   GPR_ASSERT(prev_size >= 1);
115   if (prev_size > 1) {
116     while (true) {
117       if (grpc_call_combiner_trace.enabled()) {
118         gpr_log(GPR_INFO, "  checking queue");
119       }
120       bool empty;
121       grpc_closure* closure = reinterpret_cast<grpc_closure*>(
122           gpr_mpscq_pop_and_check_end(&call_combiner->queue, &empty));
123       if (closure == nullptr) {
124         // This can happen either due to a race condition within the mpscq
125         // code or because of a race with grpc_call_combiner_start().
126         if (grpc_call_combiner_trace.enabled()) {
127           gpr_log(GPR_INFO, "  queue returned no result; checking again");
128         }
129         continue;
130       }
131       if (grpc_call_combiner_trace.enabled()) {
132         gpr_log(GPR_INFO, "  EXECUTING FROM QUEUE: closure=%p error=%s",
133                 closure, grpc_error_string(closure->error_data.error));
134       }
135       GRPC_CLOSURE_SCHED(closure, closure->error_data.error);
136       break;
137     }
138   } else if (grpc_call_combiner_trace.enabled()) {
139     gpr_log(GPR_INFO, "  queue empty");
140   }
141 }
142 
grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner * call_combiner,grpc_closure * closure)143 void grpc_call_combiner_set_notify_on_cancel(grpc_call_combiner* call_combiner,
144                                              grpc_closure* closure) {
145   GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL();
146   while (true) {
147     // Decode original state.
148     gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
149     grpc_error* original_error = decode_cancel_state_error(original_state);
150     // If error is set, invoke the cancellation closure immediately.
151     // Otherwise, store the new closure.
152     if (original_error != GRPC_ERROR_NONE) {
153       if (grpc_call_combiner_trace.enabled()) {
154         gpr_log(GPR_INFO,
155                 "call_combiner=%p: scheduling notify_on_cancel callback=%p "
156                 "for pre-existing cancellation",
157                 call_combiner, closure);
158       }
159       GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(original_error));
160       break;
161     } else {
162       if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
163                            (gpr_atm)closure)) {
164         if (grpc_call_combiner_trace.enabled()) {
165           gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
166                   call_combiner, closure);
167         }
168         // If we replaced an earlier closure, invoke the original
169         // closure with GRPC_ERROR_NONE.  This allows callers to clean
170         // up any resources they may be holding for the callback.
171         if (original_state != 0) {
172           closure = (grpc_closure*)original_state;
173           if (grpc_call_combiner_trace.enabled()) {
174             gpr_log(GPR_INFO,
175                     "call_combiner=%p: scheduling old cancel callback=%p",
176                     call_combiner, closure);
177           }
178           GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
179         }
180         break;
181       }
182     }
183     // cas failed, try again.
184   }
185 }
186 
grpc_call_combiner_cancel(grpc_call_combiner * call_combiner,grpc_error * error)187 void grpc_call_combiner_cancel(grpc_call_combiner* call_combiner,
188                                grpc_error* error) {
189   GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
190   while (true) {
191     gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
192     grpc_error* original_error = decode_cancel_state_error(original_state);
193     if (original_error != GRPC_ERROR_NONE) {
194       GRPC_ERROR_UNREF(error);
195       break;
196     }
197     if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
198                          encode_cancel_state_error(error))) {
199       if (original_state != 0) {
200         grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
201         if (grpc_call_combiner_trace.enabled()) {
202           gpr_log(GPR_INFO,
203                   "call_combiner=%p: scheduling notify_on_cancel callback=%p",
204                   call_combiner, notify_on_cancel);
205         }
206         GRPC_CLOSURE_SCHED(notify_on_cancel, GRPC_ERROR_REF(error));
207       }
208       break;
209     }
210     // cas failed, try again.
211   }
212 }
213