1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/resource_quota.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdint.h>
26 #include <string.h>
27 
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/iomgr/combiner.h"
35 
36 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
37 
38 #define MEMORY_USAGE_ESTIMATION_MAX 65536
39 
40 /* Internal linked list pointers for a resource user */
41 typedef struct {
42   grpc_resource_user* next;
43   grpc_resource_user* prev;
44 } grpc_resource_user_link;
45 
46 /* Resource users are kept in (potentially) several intrusive linked lists
47    at once. These are the list names. */
48 typedef enum {
49   /* Resource users that are waiting for an allocation */
50   GRPC_RULIST_AWAITING_ALLOCATION,
51   /* Resource users that have free memory available for internal reclamation */
52   GRPC_RULIST_NON_EMPTY_FREE_POOL,
53   /* Resource users that have published a benign reclamation is available */
54   GRPC_RULIST_RECLAIMER_BENIGN,
55   /* Resource users that have published a destructive reclamation is
56      available */
57   GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
58   /* Number of lists: must be last */
59   GRPC_RULIST_COUNT
60 } grpc_rulist;
61 
62 struct grpc_resource_user {
63   /* The quota this resource user consumes from */
64   grpc_resource_quota* resource_quota;
65 
66   /* Closure to schedule an allocation under the resource quota combiner lock */
67   grpc_closure allocate_closure;
68   /* Closure to publish a non empty free pool under the resource quota combiner
69      lock */
70   grpc_closure add_to_free_pool_closure;
71 
72   /* one ref for each ref call (released by grpc_resource_user_unref), and one
73      ref for each byte allocated (released by grpc_resource_user_free) */
74   gpr_atm refs;
75   /* is this resource user unlocked? starts at 0, increases for each shutdown
76      call */
77   gpr_atm shutdown;
78 
79   gpr_mu mu;
80   /* The amount of memory (in bytes) this user has cached for its own use: to
81      avoid quota contention, each resource user can keep some memory in
82      addition to what it is immediately using (e.g., for caching), and the quota
83      can pull it back under memory pressure.
84      This value can become negative if more memory has been requested than
85      existed in the free pool, at which point the quota is consulted to bring
86      this value non-negative (asynchronously). */
87   int64_t free_pool;
88   /* A list of closures to call once free_pool becomes non-negative - ie when
89      all outstanding allocations have been granted. */
90   grpc_closure_list on_allocated;
91   /* True if we are currently trying to allocate from the quota, false if not */
92   bool allocating;
93   /* How many bytes of allocations are outstanding */
94   int64_t outstanding_allocations;
95   /* True if we are currently trying to add ourselves to the non-free quota
96      list, false otherwise */
97   bool added_to_free_pool;
98 
99   /* The number of threads currently allocated to this resource user */
100   gpr_atm num_threads_allocated;
101 
102   /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
103    */
104   grpc_closure* reclaimers[2];
105   /* Reclaimers just posted: once we're in the combiner lock, we'll move them
106      to the array above */
107   grpc_closure* new_reclaimers[2];
108   /* Trampoline closures to finish reclamation and re-enter the quota combiner
109      lock */
110   grpc_closure post_reclaimer_closure[2];
111 
112   /* Closure to execute under the quota combiner to de-register and shutdown the
113      resource user */
114   grpc_closure destroy_closure;
115 
116   /* Links in the various grpc_rulist lists */
117   grpc_resource_user_link links[GRPC_RULIST_COUNT];
118 
119   /* The name of this resource user, for debugging/tracing */
120   char* name;
121 };
122 
123 struct grpc_resource_quota {
124   /* refcount */
125   gpr_refcount refs;
126 
127   /* estimate of current memory usage
128      scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
129   gpr_atm memory_usage_estimation;
130 
131   /* Master combiner lock: all activity on a quota executes under this combiner
132    * (so no mutex is needed for this data structure) */
133   grpc_combiner* combiner;
134   /* Size of the resource quota */
135   int64_t size;
136   /* Amount of free memory in the resource quota */
137   int64_t free_pool;
138 
139   gpr_atm last_size;
140 
141   /* Mutex to protect max_threads and num_threads_allocated */
142   /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
143    * and avoid having this mutex; but in that case, each invocation of the
144    * function grpc_resource_user_allocate_threads() would have had to do at
145    * least two atomic loads (for max_threads and num_threads_allocated) followed
146    * by a CAS (on num_threads_allocated).
147    * Moreover, we expect grpc_resource_user_allocate_threads() to be often
148    * called concurrently thereby increasing the chances of failing the CAS
149    * operation. This additional complexity is not worth the tiny perf gain we
150    * may (or may not) have by using atomics */
151   gpr_mu thread_count_mu;
152 
153   /* Max number of threads allowed */
154   int max_threads;
155 
156   /* Number of threads currently allocated via this resource_quota object */
157   int num_threads_allocated;
158 
159   /* Has rq_step been scheduled to occur? */
160   bool step_scheduled;
161 
162   /* Are we currently reclaiming memory */
163   bool reclaiming;
164 
165   /* Closure around rq_step */
166   grpc_closure rq_step_closure;
167 
168   /* Closure around rq_reclamation_done */
169   grpc_closure rq_reclamation_done_closure;
170 
171   /* This is only really usable for debugging: it's always a stale pointer, but
172      a stale pointer that might just be fresh enough to guide us to where the
173      reclamation system is stuck */
174   grpc_closure* debug_only_last_initiated_reclaimer;
175   grpc_resource_user* debug_only_last_reclaimer_resource_user;
176 
177   /* Roots of all resource user lists */
178   grpc_resource_user* roots[GRPC_RULIST_COUNT];
179 
180   char* name;
181 };
182 
183 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
184 
185 /*******************************************************************************
186  * list management
187  */
188 
rulist_add_head(grpc_resource_user * resource_user,grpc_rulist list)189 static void rulist_add_head(grpc_resource_user* resource_user,
190                             grpc_rulist list) {
191   grpc_resource_quota* resource_quota = resource_user->resource_quota;
192   grpc_resource_user** root = &resource_quota->roots[list];
193   if (*root == nullptr) {
194     *root = resource_user;
195     resource_user->links[list].next = resource_user->links[list].prev =
196         resource_user;
197   } else {
198     resource_user->links[list].next = *root;
199     resource_user->links[list].prev = (*root)->links[list].prev;
200     resource_user->links[list].next->links[list].prev =
201         resource_user->links[list].prev->links[list].next = resource_user;
202     *root = resource_user;
203   }
204 }
205 
rulist_add_tail(grpc_resource_user * resource_user,grpc_rulist list)206 static void rulist_add_tail(grpc_resource_user* resource_user,
207                             grpc_rulist list) {
208   grpc_resource_quota* resource_quota = resource_user->resource_quota;
209   grpc_resource_user** root = &resource_quota->roots[list];
210   if (*root == nullptr) {
211     *root = resource_user;
212     resource_user->links[list].next = resource_user->links[list].prev =
213         resource_user;
214   } else {
215     resource_user->links[list].next = (*root)->links[list].next;
216     resource_user->links[list].prev = *root;
217     resource_user->links[list].next->links[list].prev =
218         resource_user->links[list].prev->links[list].next = resource_user;
219   }
220 }
221 
rulist_empty(grpc_resource_quota * resource_quota,grpc_rulist list)222 static bool rulist_empty(grpc_resource_quota* resource_quota,
223                          grpc_rulist list) {
224   return resource_quota->roots[list] == nullptr;
225 }
226 
rulist_pop_head(grpc_resource_quota * resource_quota,grpc_rulist list)227 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
228                                            grpc_rulist list) {
229   grpc_resource_user** root = &resource_quota->roots[list];
230   grpc_resource_user* resource_user = *root;
231   if (resource_user == nullptr) {
232     return nullptr;
233   }
234   if (resource_user->links[list].next == resource_user) {
235     *root = nullptr;
236   } else {
237     resource_user->links[list].next->links[list].prev =
238         resource_user->links[list].prev;
239     resource_user->links[list].prev->links[list].next =
240         resource_user->links[list].next;
241     *root = resource_user->links[list].next;
242   }
243   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
244   return resource_user;
245 }
246 
rulist_remove(grpc_resource_user * resource_user,grpc_rulist list)247 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
248   if (resource_user->links[list].next == nullptr) return;
249   grpc_resource_quota* resource_quota = resource_user->resource_quota;
250   if (resource_quota->roots[list] == resource_user) {
251     resource_quota->roots[list] = resource_user->links[list].next;
252     if (resource_quota->roots[list] == resource_user) {
253       resource_quota->roots[list] = nullptr;
254     }
255   }
256   resource_user->links[list].next->links[list].prev =
257       resource_user->links[list].prev;
258   resource_user->links[list].prev->links[list].next =
259       resource_user->links[list].next;
260   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
261 }
262 
263 /*******************************************************************************
264  * resource quota state machine
265  */
266 
267 static bool rq_alloc(grpc_resource_quota* resource_quota);
268 static bool rq_reclaim_from_per_user_free_pool(
269     grpc_resource_quota* resource_quota);
270 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
271 
rq_step(void * rq,grpc_error * error)272 static void rq_step(void* rq, grpc_error* error) {
273   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
274   resource_quota->step_scheduled = false;
275   do {
276     if (rq_alloc(resource_quota)) goto done;
277   } while (rq_reclaim_from_per_user_free_pool(resource_quota));
278 
279   if (!rq_reclaim(resource_quota, false)) {
280     rq_reclaim(resource_quota, true);
281   }
282 
283 done:
284   grpc_resource_quota_unref_internal(resource_quota);
285 }
286 
rq_step_sched(grpc_resource_quota * resource_quota)287 static void rq_step_sched(grpc_resource_quota* resource_quota) {
288   if (resource_quota->step_scheduled) return;
289   resource_quota->step_scheduled = true;
290   grpc_resource_quota_ref_internal(resource_quota);
291   GRPC_CLOSURE_SCHED(&resource_quota->rq_step_closure, GRPC_ERROR_NONE);
292 }
293 
294 /* update the atomically available resource estimate - use no barriers since
295    timeliness of delivery really doesn't matter much */
rq_update_estimate(grpc_resource_quota * resource_quota)296 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
297   gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
298   if (resource_quota->size != 0) {
299     memory_usage_estimation =
300         GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
301                                        ((double)resource_quota->size)) *
302                             MEMORY_USAGE_ESTIMATION_MAX),
303                   0, MEMORY_USAGE_ESTIMATION_MAX);
304   }
305   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
306                            memory_usage_estimation);
307 }
308 
309 /* returns true if all allocations are completed */
rq_alloc(grpc_resource_quota * resource_quota)310 static bool rq_alloc(grpc_resource_quota* resource_quota) {
311   grpc_resource_user* resource_user;
312   while ((resource_user = rulist_pop_head(resource_quota,
313                                           GRPC_RULIST_AWAITING_ALLOCATION))) {
314     gpr_mu_lock(&resource_user->mu);
315     if (grpc_resource_quota_trace.enabled()) {
316       gpr_log(GPR_INFO,
317               "RQ: check allocation for user %p shutdown=%" PRIdPTR
318               " free_pool=%" PRId64,
319               resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
320               resource_user->free_pool);
321     }
322     if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
323       resource_user->allocating = false;
324       grpc_closure_list_fail_all(
325           &resource_user->on_allocated,
326           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
327       int64_t aborted_allocations = resource_user->outstanding_allocations;
328       resource_user->outstanding_allocations = 0;
329       resource_user->free_pool += aborted_allocations;
330       GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
331       gpr_mu_unlock(&resource_user->mu);
332       ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
333       continue;
334     }
335     if (resource_user->free_pool < 0 &&
336         -resource_user->free_pool <= resource_quota->free_pool) {
337       int64_t amt = -resource_user->free_pool;
338       resource_user->free_pool = 0;
339       resource_quota->free_pool -= amt;
340       rq_update_estimate(resource_quota);
341       if (grpc_resource_quota_trace.enabled()) {
342         gpr_log(GPR_INFO,
343                 "RQ %s %s: grant alloc %" PRId64
344                 " bytes; rq_free_pool -> %" PRId64,
345                 resource_quota->name, resource_user->name, amt,
346                 resource_quota->free_pool);
347       }
348     } else if (grpc_resource_quota_trace.enabled() &&
349                resource_user->free_pool >= 0) {
350       gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
351               resource_quota->name, resource_user->name);
352     }
353     if (resource_user->free_pool >= 0) {
354       resource_user->allocating = false;
355       resource_user->outstanding_allocations = 0;
356       GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
357       gpr_mu_unlock(&resource_user->mu);
358     } else {
359       rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
360       gpr_mu_unlock(&resource_user->mu);
361       return false;
362     }
363   }
364   return true;
365 }
366 
367 /* returns true if any memory could be reclaimed from buffers */
rq_reclaim_from_per_user_free_pool(grpc_resource_quota * resource_quota)368 static bool rq_reclaim_from_per_user_free_pool(
369     grpc_resource_quota* resource_quota) {
370   grpc_resource_user* resource_user;
371   while ((resource_user = rulist_pop_head(resource_quota,
372                                           GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
373     gpr_mu_lock(&resource_user->mu);
374     if (resource_user->free_pool > 0) {
375       int64_t amt = resource_user->free_pool;
376       resource_user->free_pool = 0;
377       resource_quota->free_pool += amt;
378       rq_update_estimate(resource_quota);
379       if (grpc_resource_quota_trace.enabled()) {
380         gpr_log(GPR_INFO,
381                 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
382                 " bytes; rq_free_pool -> %" PRId64,
383                 resource_quota->name, resource_user->name, amt,
384                 resource_quota->free_pool);
385       }
386       gpr_mu_unlock(&resource_user->mu);
387       return true;
388     } else {
389       gpr_mu_unlock(&resource_user->mu);
390     }
391   }
392   return false;
393 }
394 
395 /* returns true if reclamation is proceeding */
rq_reclaim(grpc_resource_quota * resource_quota,bool destructive)396 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
397   if (resource_quota->reclaiming) return true;
398   grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
399                                  : GRPC_RULIST_RECLAIMER_BENIGN;
400   grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
401   if (resource_user == nullptr) return false;
402   if (grpc_resource_quota_trace.enabled()) {
403     gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation", resource_quota->name,
404             resource_user->name, destructive ? "destructive" : "benign");
405   }
406   resource_quota->reclaiming = true;
407   grpc_resource_quota_ref_internal(resource_quota);
408   grpc_closure* c = resource_user->reclaimers[destructive];
409   GPR_ASSERT(c);
410   resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
411   resource_quota->debug_only_last_initiated_reclaimer = c;
412   resource_user->reclaimers[destructive] = nullptr;
413   GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
414   return true;
415 }
416 
417 /*******************************************************************************
418  * ru_slice: a slice implementation that is backed by a grpc_resource_user
419  */
420 
421 typedef struct {
422   grpc_slice_refcount base;
423   gpr_refcount refs;
424   grpc_resource_user* resource_user;
425   size_t size;
426 } ru_slice_refcount;
427 
ru_slice_ref(void * p)428 static void ru_slice_ref(void* p) {
429   ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(p);
430   gpr_ref(&rc->refs);
431 }
432 
ru_slice_unref(void * p)433 static void ru_slice_unref(void* p) {
434   ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(p);
435   if (gpr_unref(&rc->refs)) {
436     grpc_resource_user_free(rc->resource_user, rc->size);
437     gpr_free(rc);
438   }
439 }
440 
441 static const grpc_slice_refcount_vtable ru_slice_vtable = {
442     ru_slice_ref, ru_slice_unref, grpc_slice_default_eq_impl,
443     grpc_slice_default_hash_impl};
444 
ru_slice_create(grpc_resource_user * resource_user,size_t size)445 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
446                                   size_t size) {
447   ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(
448       gpr_malloc(sizeof(ru_slice_refcount) + size));
449   rc->base.vtable = &ru_slice_vtable;
450   rc->base.sub_refcount = &rc->base;
451   gpr_ref_init(&rc->refs, 1);
452   rc->resource_user = resource_user;
453   rc->size = size;
454   grpc_slice slice;
455   slice.refcount = &rc->base;
456   slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
457   slice.data.refcounted.length = size;
458   return slice;
459 }
460 
461 /*******************************************************************************
462  * grpc_resource_quota internal implementation: resource user manipulation under
463  * the combiner
464  */
465 
ru_allocate(void * ru,grpc_error * error)466 static void ru_allocate(void* ru, grpc_error* error) {
467   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
468   if (rulist_empty(resource_user->resource_quota,
469                    GRPC_RULIST_AWAITING_ALLOCATION)) {
470     rq_step_sched(resource_user->resource_quota);
471   }
472   rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
473 }
474 
ru_add_to_free_pool(void * ru,grpc_error * error)475 static void ru_add_to_free_pool(void* ru, grpc_error* error) {
476   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
477   if (!rulist_empty(resource_user->resource_quota,
478                     GRPC_RULIST_AWAITING_ALLOCATION) &&
479       rulist_empty(resource_user->resource_quota,
480                    GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
481     rq_step_sched(resource_user->resource_quota);
482   }
483   rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
484 }
485 
ru_post_reclaimer(grpc_resource_user * resource_user,bool destructive)486 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
487                               bool destructive) {
488   grpc_closure* closure = resource_user->new_reclaimers[destructive];
489   GPR_ASSERT(closure != nullptr);
490   resource_user->new_reclaimers[destructive] = nullptr;
491   GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
492   if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
493     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
494     return false;
495   }
496   resource_user->reclaimers[destructive] = closure;
497   return true;
498 }
499 
ru_post_benign_reclaimer(void * ru,grpc_error * error)500 static void ru_post_benign_reclaimer(void* ru, grpc_error* error) {
501   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
502   if (!ru_post_reclaimer(resource_user, false)) return;
503   if (!rulist_empty(resource_user->resource_quota,
504                     GRPC_RULIST_AWAITING_ALLOCATION) &&
505       rulist_empty(resource_user->resource_quota,
506                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
507       rulist_empty(resource_user->resource_quota,
508                    GRPC_RULIST_RECLAIMER_BENIGN)) {
509     rq_step_sched(resource_user->resource_quota);
510   }
511   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
512 }
513 
ru_post_destructive_reclaimer(void * ru,grpc_error * error)514 static void ru_post_destructive_reclaimer(void* ru, grpc_error* error) {
515   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
516   if (!ru_post_reclaimer(resource_user, true)) return;
517   if (!rulist_empty(resource_user->resource_quota,
518                     GRPC_RULIST_AWAITING_ALLOCATION) &&
519       rulist_empty(resource_user->resource_quota,
520                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
521       rulist_empty(resource_user->resource_quota,
522                    GRPC_RULIST_RECLAIMER_BENIGN) &&
523       rulist_empty(resource_user->resource_quota,
524                    GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
525     rq_step_sched(resource_user->resource_quota);
526   }
527   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
528 }
529 
ru_shutdown(void * ru,grpc_error * error)530 static void ru_shutdown(void* ru, grpc_error* error) {
531   if (grpc_resource_quota_trace.enabled()) {
532     gpr_log(GPR_INFO, "RU shutdown %p", ru);
533   }
534   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
535   gpr_mu_lock(&resource_user->mu);
536   GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
537   GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
538   resource_user->reclaimers[0] = nullptr;
539   resource_user->reclaimers[1] = nullptr;
540   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
541   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
542   if (resource_user->allocating) {
543     rq_step_sched(resource_user->resource_quota);
544   }
545   gpr_mu_unlock(&resource_user->mu);
546 }
547 
ru_destroy(void * ru,grpc_error * error)548 static void ru_destroy(void* ru, grpc_error* error) {
549   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
550   GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
551   // Free all the remaining thread quota
552   grpc_resource_user_free_threads(resource_user,
553                                   static_cast<int>(gpr_atm_no_barrier_load(
554                                       &resource_user->num_threads_allocated)));
555 
556   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
557     rulist_remove(resource_user, static_cast<grpc_rulist>(i));
558   }
559   GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
560   GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
561   if (resource_user->free_pool != 0) {
562     resource_user->resource_quota->free_pool += resource_user->free_pool;
563     rq_step_sched(resource_user->resource_quota);
564   }
565   grpc_resource_quota_unref_internal(resource_user->resource_quota);
566   gpr_mu_destroy(&resource_user->mu);
567   gpr_free(resource_user->name);
568   gpr_free(resource_user);
569 }
570 
ru_allocated_slices(void * arg,grpc_error * error)571 static void ru_allocated_slices(void* arg, grpc_error* error) {
572   grpc_resource_user_slice_allocator* slice_allocator =
573       static_cast<grpc_resource_user_slice_allocator*>(arg);
574   if (error == GRPC_ERROR_NONE) {
575     for (size_t i = 0; i < slice_allocator->count; i++) {
576       grpc_slice_buffer_add_indexed(
577           slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
578                                                  slice_allocator->length));
579     }
580   }
581   GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error));
582 }
583 
584 /*******************************************************************************
585  * grpc_resource_quota internal implementation: quota manipulation under the
586  * combiner
587  */
588 
589 typedef struct {
590   int64_t size;
591   grpc_resource_quota* resource_quota;
592   grpc_closure closure;
593 } rq_resize_args;
594 
rq_resize(void * args,grpc_error * error)595 static void rq_resize(void* args, grpc_error* error) {
596   rq_resize_args* a = static_cast<rq_resize_args*>(args);
597   int64_t delta = a->size - a->resource_quota->size;
598   a->resource_quota->size += delta;
599   a->resource_quota->free_pool += delta;
600   rq_update_estimate(a->resource_quota);
601   rq_step_sched(a->resource_quota);
602   grpc_resource_quota_unref_internal(a->resource_quota);
603   gpr_free(a);
604 }
605 
rq_reclamation_done(void * rq,grpc_error * error)606 static void rq_reclamation_done(void* rq, grpc_error* error) {
607   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
608   resource_quota->reclaiming = false;
609   rq_step_sched(resource_quota);
610   grpc_resource_quota_unref_internal(resource_quota);
611 }
612 
613 /*******************************************************************************
614  * grpc_resource_quota api
615  */
616 
617 /* Public API */
grpc_resource_quota_create(const char * name)618 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
619   grpc_resource_quota* resource_quota =
620       static_cast<grpc_resource_quota*>(gpr_malloc(sizeof(*resource_quota)));
621   gpr_ref_init(&resource_quota->refs, 1);
622   resource_quota->combiner = grpc_combiner_create();
623   resource_quota->free_pool = INT64_MAX;
624   resource_quota->size = INT64_MAX;
625   gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
626   gpr_mu_init(&resource_quota->thread_count_mu);
627   resource_quota->max_threads = INT_MAX;
628   resource_quota->num_threads_allocated = 0;
629   resource_quota->step_scheduled = false;
630   resource_quota->reclaiming = false;
631   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
632   if (name != nullptr) {
633     resource_quota->name = gpr_strdup(name);
634   } else {
635     gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
636                  (intptr_t)resource_quota);
637   }
638   GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
639                     grpc_combiner_finally_scheduler(resource_quota->combiner));
640   GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
641                     rq_reclamation_done, resource_quota,
642                     grpc_combiner_scheduler(resource_quota->combiner));
643   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
644     resource_quota->roots[i] = nullptr;
645   }
646   return resource_quota;
647 }
648 
grpc_resource_quota_unref_internal(grpc_resource_quota * resource_quota)649 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
650   if (gpr_unref(&resource_quota->refs)) {
651     // No outstanding thread quota
652     GPR_ASSERT(resource_quota->num_threads_allocated == 0);
653     GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
654     gpr_free(resource_quota->name);
655     gpr_free(resource_quota);
656   }
657 }
658 
659 /* Public API */
grpc_resource_quota_unref(grpc_resource_quota * resource_quota)660 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
661   grpc_core::ExecCtx exec_ctx;
662   grpc_resource_quota_unref_internal(resource_quota);
663 }
664 
grpc_resource_quota_ref_internal(grpc_resource_quota * resource_quota)665 grpc_resource_quota* grpc_resource_quota_ref_internal(
666     grpc_resource_quota* resource_quota) {
667   gpr_ref(&resource_quota->refs);
668   return resource_quota;
669 }
670 
671 /* Public API */
grpc_resource_quota_ref(grpc_resource_quota * resource_quota)672 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
673   grpc_resource_quota_ref_internal(resource_quota);
674 }
675 
grpc_resource_quota_get_memory_pressure(grpc_resource_quota * resource_quota)676 double grpc_resource_quota_get_memory_pressure(
677     grpc_resource_quota* resource_quota) {
678   return (static_cast<double>(gpr_atm_no_barrier_load(
679              &resource_quota->memory_usage_estimation))) /
680          (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
681 }
682 
683 /* Public API */
grpc_resource_quota_set_max_threads(grpc_resource_quota * resource_quota,int new_max_threads)684 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
685                                          int new_max_threads) {
686   GPR_ASSERT(new_max_threads >= 0);
687   gpr_mu_lock(&resource_quota->thread_count_mu);
688   resource_quota->max_threads = new_max_threads;
689   gpr_mu_unlock(&resource_quota->thread_count_mu);
690 }
691 
692 /* Public API */
grpc_resource_quota_resize(grpc_resource_quota * resource_quota,size_t size)693 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
694                                 size_t size) {
695   grpc_core::ExecCtx exec_ctx;
696   rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
697   a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
698   a->size = static_cast<int64_t>(size);
699   gpr_atm_no_barrier_store(&resource_quota->last_size,
700                            (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
701   GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
702   GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE);
703 }
704 
grpc_resource_quota_peek_size(grpc_resource_quota * resource_quota)705 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
706   return static_cast<size_t>(
707       gpr_atm_no_barrier_load(&resource_quota->last_size));
708 }
709 
710 /*******************************************************************************
711  * grpc_resource_user channel args api
712  */
713 
grpc_resource_quota_from_channel_args(const grpc_channel_args * channel_args)714 grpc_resource_quota* grpc_resource_quota_from_channel_args(
715     const grpc_channel_args* channel_args) {
716   for (size_t i = 0; i < channel_args->num_args; i++) {
717     if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
718       if (channel_args->args[i].type == GRPC_ARG_POINTER) {
719         return grpc_resource_quota_ref_internal(
720             static_cast<grpc_resource_quota*>(
721                 channel_args->args[i].value.pointer.p));
722       } else {
723         gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
724       }
725     }
726   }
727   return grpc_resource_quota_create(nullptr);
728 }
729 
rq_copy(void * rq)730 static void* rq_copy(void* rq) {
731   grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
732   return rq;
733 }
734 
rq_destroy(void * rq)735 static void rq_destroy(void* rq) {
736   grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
737 }
738 
rq_cmp(void * a,void * b)739 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
740 
grpc_resource_quota_arg_vtable(void)741 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
742   static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
743   return &vtable;
744 }
745 
746 /*******************************************************************************
747  * grpc_resource_user api
748  */
749 
grpc_resource_user_create(grpc_resource_quota * resource_quota,const char * name)750 grpc_resource_user* grpc_resource_user_create(
751     grpc_resource_quota* resource_quota, const char* name) {
752   grpc_resource_user* resource_user =
753       static_cast<grpc_resource_user*>(gpr_malloc(sizeof(*resource_user)));
754   resource_user->resource_quota =
755       grpc_resource_quota_ref_internal(resource_quota);
756   GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
757                     resource_user,
758                     grpc_combiner_scheduler(resource_quota->combiner));
759   GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
760                     &ru_add_to_free_pool, resource_user,
761                     grpc_combiner_scheduler(resource_quota->combiner));
762   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
763                     &ru_post_benign_reclaimer, resource_user,
764                     grpc_combiner_scheduler(resource_quota->combiner));
765   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
766                     &ru_post_destructive_reclaimer, resource_user,
767                     grpc_combiner_scheduler(resource_quota->combiner));
768   GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
769                     grpc_combiner_scheduler(resource_quota->combiner));
770   gpr_mu_init(&resource_user->mu);
771   gpr_atm_rel_store(&resource_user->refs, 1);
772   gpr_atm_rel_store(&resource_user->shutdown, 0);
773   resource_user->free_pool = 0;
774   grpc_closure_list_init(&resource_user->on_allocated);
775   resource_user->allocating = false;
776   resource_user->added_to_free_pool = false;
777   gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
778   resource_user->reclaimers[0] = nullptr;
779   resource_user->reclaimers[1] = nullptr;
780   resource_user->new_reclaimers[0] = nullptr;
781   resource_user->new_reclaimers[1] = nullptr;
782   resource_user->outstanding_allocations = 0;
783   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
784     resource_user->links[i].next = resource_user->links[i].prev = nullptr;
785   }
786   if (name != nullptr) {
787     resource_user->name = gpr_strdup(name);
788   } else {
789     gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR,
790                  (intptr_t)resource_user);
791   }
792   return resource_user;
793 }
794 
grpc_resource_user_quota(grpc_resource_user * resource_user)795 grpc_resource_quota* grpc_resource_user_quota(
796     grpc_resource_user* resource_user) {
797   return resource_user->resource_quota;
798 }
799 
ru_ref_by(grpc_resource_user * resource_user,gpr_atm amount)800 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
801   GPR_ASSERT(amount > 0);
802   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
803 }
804 
ru_unref_by(grpc_resource_user * resource_user,gpr_atm amount)805 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
806   GPR_ASSERT(amount > 0);
807   gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
808   GPR_ASSERT(old >= amount);
809   if (old == amount) {
810     GRPC_CLOSURE_SCHED(&resource_user->destroy_closure, GRPC_ERROR_NONE);
811   }
812 }
813 
grpc_resource_user_ref(grpc_resource_user * resource_user)814 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
815   ru_ref_by(resource_user, 1);
816 }
817 
grpc_resource_user_unref(grpc_resource_user * resource_user)818 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
819   ru_unref_by(resource_user, 1);
820 }
821 
grpc_resource_user_shutdown(grpc_resource_user * resource_user)822 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
823   if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
824     GRPC_CLOSURE_SCHED(
825         GRPC_CLOSURE_CREATE(
826             ru_shutdown, resource_user,
827             grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
828         GRPC_ERROR_NONE);
829   }
830 }
831 
grpc_resource_user_allocate_threads(grpc_resource_user * resource_user,int thread_count)832 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
833                                          int thread_count) {
834   GPR_ASSERT(thread_count >= 0);
835   bool is_success = false;
836   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
837   grpc_resource_quota* rq = resource_user->resource_quota;
838   if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
839     rq->num_threads_allocated += thread_count;
840     gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
841                                  thread_count);
842     is_success = true;
843   }
844   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
845   return is_success;
846 }
847 
grpc_resource_user_free_threads(grpc_resource_user * resource_user,int thread_count)848 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
849                                      int thread_count) {
850   GPR_ASSERT(thread_count >= 0);
851   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
852   grpc_resource_quota* rq = resource_user->resource_quota;
853   rq->num_threads_allocated -= thread_count;
854   int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
855       &resource_user->num_threads_allocated, -thread_count));
856   if (old_count < thread_count || rq->num_threads_allocated < 0) {
857     gpr_log(GPR_ERROR,
858             "Releasing more threads (%d) than currently allocated (rq threads: "
859             "%d, ru threads: %d)",
860             thread_count, rq->num_threads_allocated + thread_count, old_count);
861     abort();
862   }
863   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
864 }
865 
grpc_resource_user_alloc(grpc_resource_user * resource_user,size_t size,grpc_closure * optional_on_done)866 void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
867                               grpc_closure* optional_on_done) {
868   gpr_mu_lock(&resource_user->mu);
869   ru_ref_by(resource_user, static_cast<gpr_atm>(size));
870   resource_user->free_pool -= static_cast<int64_t>(size);
871   resource_user->outstanding_allocations += static_cast<int64_t>(size);
872   if (grpc_resource_quota_trace.enabled()) {
873     gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
874             resource_user->resource_quota->name, resource_user->name, size,
875             resource_user->free_pool);
876   }
877   if (resource_user->free_pool < 0) {
878     grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
879                              GRPC_ERROR_NONE);
880     if (!resource_user->allocating) {
881       resource_user->allocating = true;
882       GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
883     }
884   } else {
885     resource_user->outstanding_allocations -= static_cast<int64_t>(size);
886     GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE);
887   }
888   gpr_mu_unlock(&resource_user->mu);
889 }
890 
grpc_resource_user_free(grpc_resource_user * resource_user,size_t size)891 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
892   gpr_mu_lock(&resource_user->mu);
893   bool was_zero_or_negative = resource_user->free_pool <= 0;
894   resource_user->free_pool += static_cast<int64_t>(size);
895   if (grpc_resource_quota_trace.enabled()) {
896     gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
897             resource_user->resource_quota->name, resource_user->name, size,
898             resource_user->free_pool);
899   }
900   bool is_bigger_than_zero = resource_user->free_pool > 0;
901   if (is_bigger_than_zero && was_zero_or_negative &&
902       !resource_user->added_to_free_pool) {
903     resource_user->added_to_free_pool = true;
904     GRPC_CLOSURE_SCHED(&resource_user->add_to_free_pool_closure,
905                        GRPC_ERROR_NONE);
906   }
907   gpr_mu_unlock(&resource_user->mu);
908   ru_unref_by(resource_user, static_cast<gpr_atm>(size));
909 }
910 
grpc_resource_user_post_reclaimer(grpc_resource_user * resource_user,bool destructive,grpc_closure * closure)911 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
912                                        bool destructive,
913                                        grpc_closure* closure) {
914   GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
915   resource_user->new_reclaimers[destructive] = closure;
916   GRPC_CLOSURE_SCHED(&resource_user->post_reclaimer_closure[destructive],
917                      GRPC_ERROR_NONE);
918 }
919 
grpc_resource_user_finish_reclamation(grpc_resource_user * resource_user)920 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
921   if (grpc_resource_quota_trace.enabled()) {
922     gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
923             resource_user->resource_quota->name, resource_user->name);
924   }
925   GRPC_CLOSURE_SCHED(
926       &resource_user->resource_quota->rq_reclamation_done_closure,
927       GRPC_ERROR_NONE);
928 }
929 
grpc_resource_user_slice_allocator_init(grpc_resource_user_slice_allocator * slice_allocator,grpc_resource_user * resource_user,grpc_iomgr_cb_func cb,void * p)930 void grpc_resource_user_slice_allocator_init(
931     grpc_resource_user_slice_allocator* slice_allocator,
932     grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
933   GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
934                     slice_allocator, grpc_schedule_on_exec_ctx);
935   GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
936                     grpc_schedule_on_exec_ctx);
937   slice_allocator->resource_user = resource_user;
938 }
939 
grpc_resource_user_alloc_slices(grpc_resource_user_slice_allocator * slice_allocator,size_t length,size_t count,grpc_slice_buffer * dest)940 void grpc_resource_user_alloc_slices(
941     grpc_resource_user_slice_allocator* slice_allocator, size_t length,
942     size_t count, grpc_slice_buffer* dest) {
943   slice_allocator->length = length;
944   slice_allocator->count = count;
945   slice_allocator->dest = dest;
946   grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
947                            &slice_allocator->on_allocated);
948 }
949