1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/client_channel/subchannel_index.h"
22 
23 #include <stdbool.h>
24 #include <string.h>
25 
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/string_util.h>
28 
29 #include "src/core/lib/avl/avl.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/gpr/tls.h"
32 
33 // a map of subchannel_key --> subchannel, used for detecting connections
34 // to the same destination in order to share them
35 static grpc_avl g_subchannel_index;
36 
37 static gpr_mu g_mu;
38 
39 static gpr_refcount g_refcount;
40 
41 struct grpc_subchannel_key {
42   grpc_subchannel_args args;
43 };
44 
45 static bool g_force_creation = false;
46 
create_key(const grpc_subchannel_args * args,grpc_channel_args * (* copy_channel_args)(const grpc_channel_args * args))47 static grpc_subchannel_key* create_key(
48     const grpc_subchannel_args* args,
49     grpc_channel_args* (*copy_channel_args)(const grpc_channel_args* args)) {
50   grpc_subchannel_key* k =
51       static_cast<grpc_subchannel_key*>(gpr_malloc(sizeof(*k)));
52   k->args.filter_count = args->filter_count;
53   if (k->args.filter_count > 0) {
54     k->args.filters = static_cast<const grpc_channel_filter**>(
55         gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count));
56     memcpy(reinterpret_cast<grpc_channel_filter*>(k->args.filters),
57            args->filters, sizeof(*k->args.filters) * k->args.filter_count);
58   } else {
59     k->args.filters = nullptr;
60   }
61   k->args.args = copy_channel_args(args->args);
62   return k;
63 }
64 
grpc_subchannel_key_create(const grpc_subchannel_args * args)65 grpc_subchannel_key* grpc_subchannel_key_create(
66     const grpc_subchannel_args* args) {
67   return create_key(args, grpc_channel_args_normalize);
68 }
69 
subchannel_key_copy(grpc_subchannel_key * k)70 static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) {
71   return create_key(&k->args, grpc_channel_args_copy);
72 }
73 
grpc_subchannel_key_compare(const grpc_subchannel_key * a,const grpc_subchannel_key * b)74 int grpc_subchannel_key_compare(const grpc_subchannel_key* a,
75                                 const grpc_subchannel_key* b) {
76   // To pretend the keys are different, return a non-zero value.
77   if (GPR_UNLIKELY(g_force_creation)) return 1;
78   int c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
79   if (c != 0) return c;
80   if (a->args.filter_count > 0) {
81     c = memcmp(a->args.filters, b->args.filters,
82                a->args.filter_count * sizeof(*a->args.filters));
83     if (c != 0) return c;
84   }
85   return grpc_channel_args_compare(a->args.args, b->args.args);
86 }
87 
grpc_subchannel_key_destroy(grpc_subchannel_key * k)88 void grpc_subchannel_key_destroy(grpc_subchannel_key* k) {
89   gpr_free(reinterpret_cast<grpc_channel_args*>(k->args.filters));
90   grpc_channel_args_destroy(const_cast<grpc_channel_args*>(k->args.args));
91   gpr_free(k);
92 }
93 
sck_avl_destroy(void * p,void * user_data)94 static void sck_avl_destroy(void* p, void* user_data) {
95   grpc_subchannel_key_destroy(static_cast<grpc_subchannel_key*>(p));
96 }
97 
sck_avl_copy(void * p,void * unused)98 static void* sck_avl_copy(void* p, void* unused) {
99   return subchannel_key_copy(static_cast<grpc_subchannel_key*>(p));
100 }
101 
sck_avl_compare(void * a,void * b,void * unused)102 static long sck_avl_compare(void* a, void* b, void* unused) {
103   return grpc_subchannel_key_compare(static_cast<grpc_subchannel_key*>(a),
104                                      static_cast<grpc_subchannel_key*>(b));
105 }
106 
scv_avl_destroy(void * p,void * user_data)107 static void scv_avl_destroy(void* p, void* user_data) {
108   GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "subchannel_index");
109 }
110 
scv_avl_copy(void * p,void * unused)111 static void* scv_avl_copy(void* p, void* unused) {
112   GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel*)p, "subchannel_index");
113   return p;
114 }
115 
116 static const grpc_avl_vtable subchannel_avl_vtable = {
117     sck_avl_destroy,  // destroy_key
118     sck_avl_copy,     // copy_key
119     sck_avl_compare,  // compare_keys
120     scv_avl_destroy,  // destroy_value
121     scv_avl_copy      // copy_value
122 };
123 
grpc_subchannel_index_init(void)124 void grpc_subchannel_index_init(void) {
125   g_subchannel_index = grpc_avl_create(&subchannel_avl_vtable);
126   gpr_mu_init(&g_mu);
127   gpr_ref_init(&g_refcount, 1);
128 }
129 
grpc_subchannel_index_shutdown(void)130 void grpc_subchannel_index_shutdown(void) {
131   // TODO(juanlishen): This refcounting mechanism may lead to memory leackage.
132   // To solve that, we should force polling to flush any pending callbacks, then
133   // shutdown safely.
134   grpc_subchannel_index_unref();
135 }
136 
grpc_subchannel_index_unref(void)137 void grpc_subchannel_index_unref(void) {
138   if (gpr_unref(&g_refcount)) {
139     gpr_mu_destroy(&g_mu);
140     grpc_avl_unref(g_subchannel_index, grpc_core::ExecCtx::Get());
141   }
142 }
143 
grpc_subchannel_index_ref(void)144 void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); }
145 
grpc_subchannel_index_find(grpc_subchannel_key * key)146 grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) {
147   // Lock, and take a reference to the subchannel index.
148   // We don't need to do the search under a lock as avl's are immutable.
149   gpr_mu_lock(&g_mu);
150   grpc_avl index = grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get());
151   gpr_mu_unlock(&g_mu);
152 
153   grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(
154       (grpc_subchannel*)grpc_avl_get(index, key, grpc_core::ExecCtx::Get()),
155       "index_find");
156   grpc_avl_unref(index, grpc_core::ExecCtx::Get());
157 
158   return c;
159 }
160 
grpc_subchannel_index_register(grpc_subchannel_key * key,grpc_subchannel * constructed)161 grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key,
162                                                 grpc_subchannel* constructed) {
163   grpc_subchannel* c = nullptr;
164   bool need_to_unref_constructed = false;
165 
166   while (c == nullptr) {
167     need_to_unref_constructed = false;
168 
169     // Compare and swap loop:
170     // - take a reference to the current index
171     gpr_mu_lock(&g_mu);
172     grpc_avl index =
173         grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get());
174     gpr_mu_unlock(&g_mu);
175 
176     // - Check to see if a subchannel already exists
177     c = static_cast<grpc_subchannel*>(
178         grpc_avl_get(index, key, grpc_core::ExecCtx::Get()));
179     if (c != nullptr) {
180       c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register");
181     }
182     if (c != nullptr) {
183       // yes -> we're done
184       need_to_unref_constructed = true;
185     } else {
186       // no -> update the avl and compare/swap
187       grpc_avl updated =
188           grpc_avl_add(grpc_avl_ref(index, grpc_core::ExecCtx::Get()),
189                        subchannel_key_copy(key),
190                        GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"),
191                        grpc_core::ExecCtx::Get());
192 
193       // it may happen (but it's expected to be unlikely)
194       // that some other thread has changed the index:
195       // compare/swap here to check that, and retry as necessary
196       gpr_mu_lock(&g_mu);
197       if (index.root == g_subchannel_index.root) {
198         GPR_SWAP(grpc_avl, updated, g_subchannel_index);
199         c = constructed;
200       }
201       gpr_mu_unlock(&g_mu);
202 
203       grpc_avl_unref(updated, grpc_core::ExecCtx::Get());
204     }
205     grpc_avl_unref(index, grpc_core::ExecCtx::Get());
206   }
207 
208   if (need_to_unref_constructed) {
209     GRPC_SUBCHANNEL_UNREF(constructed, "index_register");
210   }
211 
212   return c;
213 }
214 
grpc_subchannel_index_unregister(grpc_subchannel_key * key,grpc_subchannel * constructed)215 void grpc_subchannel_index_unregister(grpc_subchannel_key* key,
216                                       grpc_subchannel* constructed) {
217   bool done = false;
218   while (!done) {
219     // Compare and swap loop:
220     // - take a reference to the current index
221     gpr_mu_lock(&g_mu);
222     grpc_avl index =
223         grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get());
224     gpr_mu_unlock(&g_mu);
225 
226     // Check to see if this key still refers to the previously
227     // registered subchannel
228     grpc_subchannel* c = static_cast<grpc_subchannel*>(
229         grpc_avl_get(index, key, grpc_core::ExecCtx::Get()));
230     if (c != constructed) {
231       grpc_avl_unref(index, grpc_core::ExecCtx::Get());
232       break;
233     }
234 
235     // compare and swap the update (some other thread may have
236     // mutated the index behind us)
237     grpc_avl updated =
238         grpc_avl_remove(grpc_avl_ref(index, grpc_core::ExecCtx::Get()), key,
239                         grpc_core::ExecCtx::Get());
240 
241     gpr_mu_lock(&g_mu);
242     if (index.root == g_subchannel_index.root) {
243       GPR_SWAP(grpc_avl, updated, g_subchannel_index);
244       done = true;
245     }
246     gpr_mu_unlock(&g_mu);
247 
248     grpc_avl_unref(updated, grpc_core::ExecCtx::Get());
249     grpc_avl_unref(index, grpc_core::ExecCtx::Get());
250   }
251 }
252 
grpc_subchannel_index_test_only_set_force_creation(bool force_creation)253 void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) {
254   g_force_creation = force_creation;
255 }
256