1 /*
2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 #include <sys/types.h>
30 #include <limits.h>
31 #include <string.h>
32 #include <stdlib.h>
33 
34 #include "event2/event.h"
35 #include "event2/event_struct.h"
36 #include "event2/util.h"
37 #include "event2/bufferevent.h"
38 #include "event2/bufferevent_struct.h"
39 #include "event2/buffer.h"
40 
41 #include "ratelim-internal.h"
42 
43 #include "bufferevent-internal.h"
44 #include "mm-internal.h"
45 #include "util-internal.h"
46 #include "event-internal.h"
47 
48 int
ev_token_bucket_init(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick,int reinitialize)49 ev_token_bucket_init(struct ev_token_bucket *bucket,
50     const struct ev_token_bucket_cfg *cfg,
51     ev_uint32_t current_tick,
52     int reinitialize)
53 {
54 	if (reinitialize) {
55 		/* on reinitialization, we only clip downwards, since we've
56 		   already used who-knows-how-much bandwidth this tick.  We
57 		   leave "last_updated" as it is; the next update will add the
58 		   appropriate amount of bandwidth to the bucket.
59 		*/
60 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
61 			bucket->read_limit = cfg->read_maximum;
62 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
63 			bucket->write_limit = cfg->write_maximum;
64 	} else {
65 		bucket->read_limit = cfg->read_rate;
66 		bucket->write_limit = cfg->write_rate;
67 		bucket->last_updated = current_tick;
68 	}
69 	return 0;
70 }
71 
72 int
ev_token_bucket_update(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick)73 ev_token_bucket_update(struct ev_token_bucket *bucket,
74     const struct ev_token_bucket_cfg *cfg,
75     ev_uint32_t current_tick)
76 {
77 	/* It's okay if the tick number overflows, since we'll just
78 	 * wrap around when we do the unsigned substraction. */
79 	unsigned n_ticks = current_tick - bucket->last_updated;
80 
81 	/* Make sure some ticks actually happened, and that time didn't
82 	 * roll back. */
83 	if (n_ticks == 0 || n_ticks > INT_MAX)
84 		return 0;
85 
86 	/* Naively, we would say
87 		bucket->limit += n_ticks * cfg->rate;
88 
89 		if (bucket->limit > cfg->maximum)
90 			bucket->limit = cfg->maximum;
91 
92 	   But we're worried about overflow, so we do it like this:
93 	*/
94 
95 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
96 		bucket->read_limit = cfg->read_maximum;
97 	else
98 		bucket->read_limit += n_ticks * cfg->read_rate;
99 
100 
101 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
102 		bucket->write_limit = cfg->write_maximum;
103 	else
104 		bucket->write_limit += n_ticks * cfg->write_rate;
105 
106 
107 	bucket->last_updated = current_tick;
108 
109 	return 1;
110 }
111 
112 static inline void
bufferevent_update_buckets(struct bufferevent_private * bev)113 bufferevent_update_buckets(struct bufferevent_private *bev)
114 {
115 	/* Must hold lock on bev. */
116 	struct timeval now;
117 	unsigned tick;
118 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
119 	tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
120 	if (tick != bev->rate_limiting->limit.last_updated)
121 		ev_token_bucket_update(&bev->rate_limiting->limit,
122 		    bev->rate_limiting->cfg, tick);
123 }
124 
125 ev_uint32_t
ev_token_bucket_get_tick(const struct timeval * tv,const struct ev_token_bucket_cfg * cfg)126 ev_token_bucket_get_tick(const struct timeval *tv,
127     const struct ev_token_bucket_cfg *cfg)
128 {
129 	/* This computation uses two multiplies and a divide.  We could do
130 	 * fewer if we knew that the tick length was an integer number of
131 	 * seconds, or if we knew it divided evenly into a second.  We should
132 	 * investigate that more.
133 	 */
134 
135 	/* We cast to an ev_uint64_t first, since we don't want to overflow
136 	 * before we do the final divide. */
137 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
138 	return (unsigned)(msec / cfg->msec_per_tick);
139 }
140 
141 struct ev_token_bucket_cfg *
ev_token_bucket_cfg_new(size_t read_rate,size_t read_burst,size_t write_rate,size_t write_burst,const struct timeval * tick_len)142 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
143     size_t write_rate, size_t write_burst,
144     const struct timeval *tick_len)
145 {
146 	struct ev_token_bucket_cfg *r;
147 	struct timeval g;
148 	if (! tick_len) {
149 		g.tv_sec = 1;
150 		g.tv_usec = 0;
151 		tick_len = &g;
152 	}
153 	if (read_rate > read_burst || write_rate > write_burst ||
154 	    read_rate < 1 || write_rate < 1)
155 		return NULL;
156 	if (read_rate > EV_RATE_LIMIT_MAX ||
157 	    write_rate > EV_RATE_LIMIT_MAX ||
158 	    read_burst > EV_RATE_LIMIT_MAX ||
159 	    write_burst > EV_RATE_LIMIT_MAX)
160 		return NULL;
161 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
162 	if (!r)
163 		return NULL;
164 	r->read_rate = read_rate;
165 	r->write_rate = write_rate;
166 	r->read_maximum = read_burst;
167 	r->write_maximum = write_burst;
168 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
169 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
170 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
171 	return r;
172 }
173 
174 void
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg * cfg)175 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
176 {
177 	mm_free(cfg);
178 }
179 
180 /* No matter how big our bucket gets, don't try to read more than this
181  * much in a single read operation. */
182 #define MAX_TO_READ_EVER 16384
183 /* No matter how big our bucket gets, don't try to write more than this
184  * much in a single write operation. */
185 #define MAX_TO_WRITE_EVER 16384
186 
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189 
190 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
191 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
192 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
193 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
194 
195 /** Helper: figure out the maximum amount we should write if is_write, or
196     the maximum amount we should read if is_read.  Return that maximum, or
197     0 if our bucket is wholly exhausted.
198  */
199 static inline ev_ssize_t
_bufferevent_get_rlim_max(struct bufferevent_private * bev,int is_write)200 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
201 {
202 	/* needs lock on bev. */
203 	ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
204 
205 #define LIM(x)						\
206 	(is_write ? (x).write_limit : (x).read_limit)
207 
208 #define GROUP_SUSPENDED(g)			\
209 	(is_write ? (g)->write_suspended : (g)->read_suspended)
210 
211 	/* Sets max_so_far to MIN(x, max_so_far) */
212 #define CLAMPTO(x)				\
213 	do {					\
214 		if (max_so_far > (x))		\
215 			max_so_far = (x);	\
216 	} while (0);
217 
218 	if (!bev->rate_limiting)
219 		return max_so_far;
220 
221 	/* If rate-limiting is enabled at all, update the appropriate
222 	   bucket, and take the smaller of our rate limit and the group
223 	   rate limit.
224 	 */
225 
226 	if (bev->rate_limiting->cfg) {
227 		bufferevent_update_buckets(bev);
228 		max_so_far = LIM(bev->rate_limiting->limit);
229 	}
230 	if (bev->rate_limiting->group) {
231 		struct bufferevent_rate_limit_group *g =
232 		    bev->rate_limiting->group;
233 		ev_ssize_t share;
234 		LOCK_GROUP(g);
235 		if (GROUP_SUSPENDED(g)) {
236 			/* We can get here if we failed to lock this
237 			 * particular bufferevent while suspending the whole
238 			 * group. */
239 			if (is_write)
240 				bufferevent_suspend_write(&bev->bev,
241 				    BEV_SUSPEND_BW_GROUP);
242 			else
243 				bufferevent_suspend_read(&bev->bev,
244 				    BEV_SUSPEND_BW_GROUP);
245 			share = 0;
246 		} else {
247 			/* XXXX probably we should divide among the active
248 			 * members, not the total members. */
249 			share = LIM(g->rate_limit) / g->n_members;
250 			if (share < g->min_share)
251 				share = g->min_share;
252 		}
253 		UNLOCK_GROUP(g);
254 		CLAMPTO(share);
255 	}
256 
257 	if (max_so_far < 0)
258 		max_so_far = 0;
259 	return max_so_far;
260 }
261 
262 ev_ssize_t
_bufferevent_get_read_max(struct bufferevent_private * bev)263 _bufferevent_get_read_max(struct bufferevent_private *bev)
264 {
265 	return _bufferevent_get_rlim_max(bev, 0);
266 }
267 
268 ev_ssize_t
_bufferevent_get_write_max(struct bufferevent_private * bev)269 _bufferevent_get_write_max(struct bufferevent_private *bev)
270 {
271 	return _bufferevent_get_rlim_max(bev, 1);
272 }
273 
274 int
_bufferevent_decrement_read_buckets(struct bufferevent_private * bev,ev_ssize_t bytes)275 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
276 {
277 	/* XXXXX Make sure all users of this function check its return value */
278 	int r = 0;
279 	/* need to hold lock on bev */
280 	if (!bev->rate_limiting)
281 		return 0;
282 
283 	if (bev->rate_limiting->cfg) {
284 		bev->rate_limiting->limit.read_limit -= bytes;
285 		if (bev->rate_limiting->limit.read_limit <= 0) {
286 			bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
287 			if (event_add(&bev->rate_limiting->refill_bucket_event,
288 				&bev->rate_limiting->cfg->tick_timeout) < 0)
289 				r = -1;
290 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
291 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
292 				event_del(&bev->rate_limiting->refill_bucket_event);
293 			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
294 		}
295 	}
296 
297 	if (bev->rate_limiting->group) {
298 		LOCK_GROUP(bev->rate_limiting->group);
299 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300 		bev->rate_limiting->group->total_read += bytes;
301 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302 			_bev_group_suspend_reading(bev->rate_limiting->group);
303 		} else if (bev->rate_limiting->group->read_suspended) {
304 			_bev_group_unsuspend_reading(bev->rate_limiting->group);
305 		}
306 		UNLOCK_GROUP(bev->rate_limiting->group);
307 	}
308 
309 	return r;
310 }
311 
312 int
_bufferevent_decrement_write_buckets(struct bufferevent_private * bev,ev_ssize_t bytes)313 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
314 {
315 	/* XXXXX Make sure all users of this function check its return value */
316 	int r = 0;
317 	/* need to hold lock */
318 	if (!bev->rate_limiting)
319 		return 0;
320 
321 	if (bev->rate_limiting->cfg) {
322 		bev->rate_limiting->limit.write_limit -= bytes;
323 		if (bev->rate_limiting->limit.write_limit <= 0) {
324 			bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
325 			if (event_add(&bev->rate_limiting->refill_bucket_event,
326 				&bev->rate_limiting->cfg->tick_timeout) < 0)
327 				r = -1;
328 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
329 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
330 				event_del(&bev->rate_limiting->refill_bucket_event);
331 			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
332 		}
333 	}
334 
335 	if (bev->rate_limiting->group) {
336 		LOCK_GROUP(bev->rate_limiting->group);
337 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338 		bev->rate_limiting->group->total_written += bytes;
339 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340 			_bev_group_suspend_writing(bev->rate_limiting->group);
341 		} else if (bev->rate_limiting->group->write_suspended) {
342 			_bev_group_unsuspend_writing(bev->rate_limiting->group);
343 		}
344 		UNLOCK_GROUP(bev->rate_limiting->group);
345 	}
346 
347 	return r;
348 }
349 
350 /** Stop reading on every bufferevent in <b>g</b> */
351 static int
_bev_group_suspend_reading(struct bufferevent_rate_limit_group * g)352 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
353 {
354 	/* Needs group lock */
355 	struct bufferevent_private *bev;
356 	g->read_suspended = 1;
357 	g->pending_unsuspend_read = 0;
358 
359 	/* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
360 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
361 	   the bufferevent locks.  If we are unable to lock any individual
362 	   bufferevent, it will find out later when it looks at its limit
363 	   and sees that its group is suspended.
364 	*/
365 	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366 		if (EVLOCK_TRY_LOCK(bev->lock)) {
367 			bufferevent_suspend_read(&bev->bev,
368 			    BEV_SUSPEND_BW_GROUP);
369 			EVLOCK_UNLOCK(bev->lock, 0);
370 		}
371 	}
372 	return 0;
373 }
374 
375 /** Stop writing on every bufferevent in <b>g</b> */
376 static int
_bev_group_suspend_writing(struct bufferevent_rate_limit_group * g)377 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
378 {
379 	/* Needs group lock */
380 	struct bufferevent_private *bev;
381 	g->write_suspended = 1;
382 	g->pending_unsuspend_write = 0;
383 	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384 		if (EVLOCK_TRY_LOCK(bev->lock)) {
385 			bufferevent_suspend_write(&bev->bev,
386 			    BEV_SUSPEND_BW_GROUP);
387 			EVLOCK_UNLOCK(bev->lock, 0);
388 		}
389 	}
390 	return 0;
391 }
392 
393 /** Timer callback invoked on a single bufferevent with one or more exhausted
394     buckets when they are ready to refill. */
395 static void
_bev_refill_callback(evutil_socket_t fd,short what,void * arg)396 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
397 {
398 	unsigned tick;
399 	struct timeval now;
400 	struct bufferevent_private *bev = arg;
401 	int again = 0;
402 	BEV_LOCK(&bev->bev);
403 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404 		BEV_UNLOCK(&bev->bev);
405 		return;
406 	}
407 
408 	/* First, update the bucket */
409 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410 	tick = ev_token_bucket_get_tick(&now,
411 	    bev->rate_limiting->cfg);
412 	ev_token_bucket_update(&bev->rate_limiting->limit,
413 	    bev->rate_limiting->cfg,
414 	    tick);
415 
416 	/* Now unsuspend any read/write operations as appropriate. */
417 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418 		if (bev->rate_limiting->limit.read_limit > 0)
419 			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
420 		else
421 			again = 1;
422 	}
423 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424 		if (bev->rate_limiting->limit.write_limit > 0)
425 			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
426 		else
427 			again = 1;
428 	}
429 	if (again) {
430 		/* One or more of the buckets may need another refill if they
431 		   started negative.
432 
433 		   XXXX if we need to be quiet for more ticks, we should
434 		   maybe figure out what timeout we really want.
435 		*/
436 		/* XXXX Handle event_add failure somehow */
437 		event_add(&bev->rate_limiting->refill_bucket_event,
438 		    &bev->rate_limiting->cfg->tick_timeout);
439 	}
440 	BEV_UNLOCK(&bev->bev);
441 }
442 
443 /** Helper: grab a random element from a bufferevent group. */
444 static struct bufferevent_private *
_bev_group_random_element(struct bufferevent_rate_limit_group * group)445 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
446 {
447 	int which;
448 	struct bufferevent_private *bev;
449 
450 	/* requires group lock */
451 
452 	if (!group->n_members)
453 		return NULL;
454 
455 	EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
456 
457 	which = _evutil_weakrand() % group->n_members;
458 
459 	bev = TAILQ_FIRST(&group->members);
460 	while (which--)
461 		bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
462 
463 	return bev;
464 }
465 
466 /** Iterate over the elements of a rate-limiting group 'g' with a random
467     starting point, assigning each to the variable 'bev', and executing the
468     block 'block'.
469 
470     We do this in a half-baked effort to get fairness among group members.
471     XXX Round-robin or some kind of priority queue would be even more fair.
472  */
473 #define FOREACH_RANDOM_ORDER(block)			\
474 	do {						\
475 		first = _bev_group_random_element(g);	\
476 		for (bev = first; bev != TAILQ_END(&g->members); \
477 		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
478 			block ;					 \
479 		}						 \
480 		for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
481 		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
482 			block ;						\
483 		}							\
484 	} while (0)
485 
486 static void
_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group * g)487 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
488 {
489 	int again = 0;
490 	struct bufferevent_private *bev, *first;
491 
492 	g->read_suspended = 0;
493 	FOREACH_RANDOM_ORDER({
494 		if (EVLOCK_TRY_LOCK(bev->lock)) {
495 			bufferevent_unsuspend_read(&bev->bev,
496 			    BEV_SUSPEND_BW_GROUP);
497 			EVLOCK_UNLOCK(bev->lock, 0);
498 		} else {
499 			again = 1;
500 		}
501 	});
502 	g->pending_unsuspend_read = again;
503 }
504 
505 static void
_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group * g)506 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
507 {
508 	int again = 0;
509 	struct bufferevent_private *bev, *first;
510 	g->write_suspended = 0;
511 
512 	FOREACH_RANDOM_ORDER({
513 		if (EVLOCK_TRY_LOCK(bev->lock)) {
514 			bufferevent_unsuspend_write(&bev->bev,
515 			    BEV_SUSPEND_BW_GROUP);
516 			EVLOCK_UNLOCK(bev->lock, 0);
517 		} else {
518 			again = 1;
519 		}
520 	});
521 	g->pending_unsuspend_write = again;
522 }
523 
524 /** Callback invoked every tick to add more elements to the group bucket
525     and unsuspend group members as needed.
526  */
527 static void
_bev_group_refill_callback(evutil_socket_t fd,short what,void * arg)528 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
529 {
530 	struct bufferevent_rate_limit_group *g = arg;
531 	unsigned tick;
532 	struct timeval now;
533 
534 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
535 
536 	LOCK_GROUP(g);
537 
538 	tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
539 	ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
540 
541 	if (g->pending_unsuspend_read ||
542 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
543 		_bev_group_unsuspend_reading(g);
544 	}
545 	if (g->pending_unsuspend_write ||
546 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
547 		_bev_group_unsuspend_writing(g);
548 	}
549 
550 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
551 	 * with pending_unsuspend_write/read, we should do it on the
552 	 * next iteration of the mainloop.
553 	 */
554 
555 	UNLOCK_GROUP(g);
556 }
557 
558 int
bufferevent_set_rate_limit(struct bufferevent * bev,struct ev_token_bucket_cfg * cfg)559 bufferevent_set_rate_limit(struct bufferevent *bev,
560     struct ev_token_bucket_cfg *cfg)
561 {
562 	struct bufferevent_private *bevp =
563 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
564 	int r = -1;
565 	struct bufferevent_rate_limit *rlim;
566 	struct timeval now;
567 	ev_uint32_t tick;
568 	int reinit = 0, suspended = 0;
569 	/* XXX reference-count cfg */
570 
571 	BEV_LOCK(bev);
572 
573 	if (cfg == NULL) {
574 		if (bevp->rate_limiting) {
575 			rlim = bevp->rate_limiting;
576 			rlim->cfg = NULL;
577 			bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
578 			bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
579 			if (event_initialized(&rlim->refill_bucket_event))
580 				event_del(&rlim->refill_bucket_event);
581 		}
582 		r = 0;
583 		goto done;
584 	}
585 
586 	event_base_gettimeofday_cached(bev->ev_base, &now);
587 	tick = ev_token_bucket_get_tick(&now, cfg);
588 
589 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590 		/* no-op */
591 		r = 0;
592 		goto done;
593 	}
594 	if (bevp->rate_limiting == NULL) {
595 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596 		if (!rlim)
597 			goto done;
598 		bevp->rate_limiting = rlim;
599 	} else {
600 		rlim = bevp->rate_limiting;
601 	}
602 	reinit = rlim->cfg != NULL;
603 
604 	rlim->cfg = cfg;
605 	ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
606 
607 	if (reinit) {
608 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609 		event_del(&rlim->refill_bucket_event);
610 	}
611 	evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
612 	    _bev_refill_callback, bevp);
613 
614 	if (rlim->limit.read_limit > 0) {
615 		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
616 	} else {
617 		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
618 		suspended=1;
619 	}
620 	if (rlim->limit.write_limit > 0) {
621 		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
622 	} else {
623 		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
624 		suspended = 1;
625 	}
626 
627 	if (suspended)
628 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629 
630 	r = 0;
631 
632 done:
633 	BEV_UNLOCK(bev);
634 	return r;
635 }
636 
637 struct bufferevent_rate_limit_group *
bufferevent_rate_limit_group_new(struct event_base * base,const struct ev_token_bucket_cfg * cfg)638 bufferevent_rate_limit_group_new(struct event_base *base,
639     const struct ev_token_bucket_cfg *cfg)
640 {
641 	struct bufferevent_rate_limit_group *g;
642 	struct timeval now;
643 	ev_uint32_t tick;
644 
645 	event_base_gettimeofday_cached(base, &now);
646 	tick = ev_token_bucket_get_tick(&now, cfg);
647 
648 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649 	if (!g)
650 		return NULL;
651 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652 	TAILQ_INIT(&g->members);
653 
654 	ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
655 
656 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
657 	    _bev_group_refill_callback, g);
658 	/*XXXX handle event_add failure */
659 	event_add(&g->master_refill_event, &cfg->tick_timeout);
660 
661 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662 
663 	bufferevent_rate_limit_group_set_min_share(g, 64);
664 
665 	return g;
666 }
667 
668 int
bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group * g,const struct ev_token_bucket_cfg * cfg)669 bufferevent_rate_limit_group_set_cfg(
670 	struct bufferevent_rate_limit_group *g,
671 	const struct ev_token_bucket_cfg *cfg)
672 {
673 	int same_tick;
674 	if (!g || !cfg)
675 		return -1;
676 
677 	LOCK_GROUP(g);
678 	same_tick = evutil_timercmp(
679 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
680 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
681 
682 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
683 		g->rate_limit.read_limit = cfg->read_maximum;
684 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
685 		g->rate_limit.write_limit = cfg->write_maximum;
686 
687 	if (!same_tick) {
688 		/* This can cause a hiccup in the schedule */
689 		event_add(&g->master_refill_event, &cfg->tick_timeout);
690 	}
691 
692 	/* The new limits might force us to adjust min_share differently. */
693 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
694 
695 	UNLOCK_GROUP(g);
696 	return 0;
697 }
698 
699 int
bufferevent_rate_limit_group_set_min_share(struct bufferevent_rate_limit_group * g,size_t share)700 bufferevent_rate_limit_group_set_min_share(
701 	struct bufferevent_rate_limit_group *g,
702 	size_t share)
703 {
704 	if (share > EV_SSIZE_MAX)
705 		return -1;
706 
707 	g->configured_min_share = share;
708 
709 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
710 	 * state, at least one connection can go per tick. */
711 	if (share > g->rate_limit_cfg.read_rate)
712 		share = g->rate_limit_cfg.read_rate;
713 	if (share > g->rate_limit_cfg.write_rate)
714 		share = g->rate_limit_cfg.write_rate;
715 
716 	g->min_share = share;
717 	return 0;
718 }
719 
720 void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group * g)721 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
722 {
723 	LOCK_GROUP(g);
724 	EVUTIL_ASSERT(0 == g->n_members);
725 	event_del(&g->master_refill_event);
726 	UNLOCK_GROUP(g);
727 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
728 	mm_free(g);
729 }
730 
731 int
bufferevent_add_to_rate_limit_group(struct bufferevent * bev,struct bufferevent_rate_limit_group * g)732 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
733     struct bufferevent_rate_limit_group *g)
734 {
735 	int wsuspend, rsuspend;
736 	struct bufferevent_private *bevp =
737 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
738 	BEV_LOCK(bev);
739 
740 	if (!bevp->rate_limiting) {
741 		struct bufferevent_rate_limit *rlim;
742 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
743 		if (!rlim) {
744 			BEV_UNLOCK(bev);
745 			return -1;
746 		}
747 		evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
748 		    _bev_refill_callback, bevp);
749 		bevp->rate_limiting = rlim;
750 	}
751 
752 	if (bevp->rate_limiting->group == g) {
753 		BEV_UNLOCK(bev);
754 		return 0;
755 	}
756 	if (bevp->rate_limiting->group)
757 		bufferevent_remove_from_rate_limit_group(bev);
758 
759 	LOCK_GROUP(g);
760 	bevp->rate_limiting->group = g;
761 	++g->n_members;
762 	TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
763 
764 	rsuspend = g->read_suspended;
765 	wsuspend = g->write_suspended;
766 
767 	UNLOCK_GROUP(g);
768 
769 	if (rsuspend)
770 		bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
771 	if (wsuspend)
772 		bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
773 
774 	BEV_UNLOCK(bev);
775 	return 0;
776 }
777 
778 int
bufferevent_remove_from_rate_limit_group(struct bufferevent * bev)779 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
780 {
781 	return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
782 }
783 
784 int
bufferevent_remove_from_rate_limit_group_internal(struct bufferevent * bev,int unsuspend)785 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
786     int unsuspend)
787 {
788 	struct bufferevent_private *bevp =
789 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
790 	BEV_LOCK(bev);
791 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
792 		struct bufferevent_rate_limit_group *g =
793 		    bevp->rate_limiting->group;
794 		LOCK_GROUP(g);
795 		bevp->rate_limiting->group = NULL;
796 		--g->n_members;
797 		TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
798 		UNLOCK_GROUP(g);
799 	}
800 	if (unsuspend) {
801 		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
802 		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
803 	}
804 	BEV_UNLOCK(bev);
805 	return 0;
806 }
807 
808 /* ===
809  * API functions to expose rate limits.
810  *
811  * Don't use these from inside Libevent; they're meant to be for use by
812  * the program.
813  * === */
814 
815 /* Mostly you don't want to use this function from inside libevent;
816  * _bufferevent_get_read_max() is more likely what you want*/
817 ev_ssize_t
bufferevent_get_read_limit(struct bufferevent * bev)818 bufferevent_get_read_limit(struct bufferevent *bev)
819 {
820 	ev_ssize_t r;
821 	struct bufferevent_private *bevp;
822 	BEV_LOCK(bev);
823 	bevp = BEV_UPCAST(bev);
824 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
825 		bufferevent_update_buckets(bevp);
826 		r = bevp->rate_limiting->limit.read_limit;
827 	} else {
828 		r = EV_SSIZE_MAX;
829 	}
830 	BEV_UNLOCK(bev);
831 	return r;
832 }
833 
834 /* Mostly you don't want to use this function from inside libevent;
835  * _bufferevent_get_write_max() is more likely what you want*/
836 ev_ssize_t
bufferevent_get_write_limit(struct bufferevent * bev)837 bufferevent_get_write_limit(struct bufferevent *bev)
838 {
839 	ev_ssize_t r;
840 	struct bufferevent_private *bevp;
841 	BEV_LOCK(bev);
842 	bevp = BEV_UPCAST(bev);
843 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
844 		bufferevent_update_buckets(bevp);
845 		r = bevp->rate_limiting->limit.write_limit;
846 	} else {
847 		r = EV_SSIZE_MAX;
848 	}
849 	BEV_UNLOCK(bev);
850 	return r;
851 }
852 
853 ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent * bev)854 bufferevent_get_max_to_read(struct bufferevent *bev)
855 {
856 	ev_ssize_t r;
857 	BEV_LOCK(bev);
858 	r = _bufferevent_get_read_max(BEV_UPCAST(bev));
859 	BEV_UNLOCK(bev);
860 	return r;
861 }
862 
863 ev_ssize_t
bufferevent_get_max_to_write(struct bufferevent * bev)864 bufferevent_get_max_to_write(struct bufferevent *bev)
865 {
866 	ev_ssize_t r;
867 	BEV_LOCK(bev);
868 	r = _bufferevent_get_write_max(BEV_UPCAST(bev));
869 	BEV_UNLOCK(bev);
870 	return r;
871 }
872 
873 
874 /* Mostly you don't want to use this function from inside libevent;
875  * _bufferevent_get_read_max() is more likely what you want*/
876 ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group * grp)877 bufferevent_rate_limit_group_get_read_limit(
878 	struct bufferevent_rate_limit_group *grp)
879 {
880 	ev_ssize_t r;
881 	LOCK_GROUP(grp);
882 	r = grp->rate_limit.read_limit;
883 	UNLOCK_GROUP(grp);
884 	return r;
885 }
886 
887 /* Mostly you don't want to use this function from inside libevent;
888  * _bufferevent_get_write_max() is more likely what you want. */
889 ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group * grp)890 bufferevent_rate_limit_group_get_write_limit(
891 	struct bufferevent_rate_limit_group *grp)
892 {
893 	ev_ssize_t r;
894 	LOCK_GROUP(grp);
895 	r = grp->rate_limit.write_limit;
896 	UNLOCK_GROUP(grp);
897 	return r;
898 }
899 
900 int
bufferevent_decrement_read_limit(struct bufferevent * bev,ev_ssize_t decr)901 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
902 {
903 	int r = 0;
904 	ev_ssize_t old_limit, new_limit;
905 	struct bufferevent_private *bevp;
906 	BEV_LOCK(bev);
907 	bevp = BEV_UPCAST(bev);
908 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
909 	old_limit = bevp->rate_limiting->limit.read_limit;
910 
911 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
912 	if (old_limit > 0 && new_limit <= 0) {
913 		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
914 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
915 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
916 			r = -1;
917 	} else if (old_limit <= 0 && new_limit > 0) {
918 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
919 			event_del(&bevp->rate_limiting->refill_bucket_event);
920 		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
921 	}
922 
923 	BEV_UNLOCK(bev);
924 	return r;
925 }
926 
927 int
bufferevent_decrement_write_limit(struct bufferevent * bev,ev_ssize_t decr)928 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
929 {
930 	/* XXXX this is mostly copy-and-paste from
931 	 * bufferevent_decrement_read_limit */
932 	int r = 0;
933 	ev_ssize_t old_limit, new_limit;
934 	struct bufferevent_private *bevp;
935 	BEV_LOCK(bev);
936 	bevp = BEV_UPCAST(bev);
937 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
938 	old_limit = bevp->rate_limiting->limit.write_limit;
939 
940 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
941 	if (old_limit > 0 && new_limit <= 0) {
942 		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
943 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
944 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
945 			r = -1;
946 	} else if (old_limit <= 0 && new_limit > 0) {
947 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
948 			event_del(&bevp->rate_limiting->refill_bucket_event);
949 		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
950 	}
951 
952 	BEV_UNLOCK(bev);
953 	return r;
954 }
955 
956 int
bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)957 bufferevent_rate_limit_group_decrement_read(
958 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
959 {
960 	int r = 0;
961 	ev_ssize_t old_limit, new_limit;
962 	LOCK_GROUP(grp);
963 	old_limit = grp->rate_limit.read_limit;
964 	new_limit = (grp->rate_limit.read_limit -= decr);
965 
966 	if (old_limit > 0 && new_limit <= 0) {
967 		_bev_group_suspend_reading(grp);
968 	} else if (old_limit <= 0 && new_limit > 0) {
969 		_bev_group_unsuspend_reading(grp);
970 	}
971 
972 	UNLOCK_GROUP(grp);
973 	return r;
974 }
975 
976 int
bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)977 bufferevent_rate_limit_group_decrement_write(
978 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
979 {
980 	int r = 0;
981 	ev_ssize_t old_limit, new_limit;
982 	LOCK_GROUP(grp);
983 	old_limit = grp->rate_limit.write_limit;
984 	new_limit = (grp->rate_limit.write_limit -= decr);
985 
986 	if (old_limit > 0 && new_limit <= 0) {
987 		_bev_group_suspend_writing(grp);
988 	} else if (old_limit <= 0 && new_limit > 0) {
989 		_bev_group_unsuspend_writing(grp);
990 	}
991 
992 	UNLOCK_GROUP(grp);
993 	return r;
994 }
995 
996 void
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group * grp,ev_uint64_t * total_read_out,ev_uint64_t * total_written_out)997 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
998     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
999 {
1000 	EVUTIL_ASSERT(grp != NULL);
1001 	if (total_read_out)
1002 		*total_read_out = grp->total_read;
1003 	if (total_written_out)
1004 		*total_written_out = grp->total_written;
1005 }
1006 
1007 void
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group * grp)1008 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1009 {
1010 	grp->total_read = grp->total_written = 0;
1011 }
1012