1/*	$NetBSD: bufferevent_ratelim.c,v 1.6 2021/04/10 19:18:45 rillig Exp $	*/
2
3/*
4 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 * 3. The name of the author may not be used to endorse or promote products
17 *    derived from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30#include "evconfig-private.h"
31
32#include <sys/types.h>
33#include <limits.h>
34#include <string.h>
35#include <stdlib.h>
36
37#include "event2/event.h"
38#include "event2/event_struct.h"
39#include "event2/util.h"
40#include "event2/bufferevent.h"
41#include "event2/bufferevent_struct.h"
42#include "event2/buffer.h"
43
44#include "ratelim-internal.h"
45
46#include "bufferevent-internal.h"
47#include "mm-internal.h"
48#include "util-internal.h"
49#include "event-internal.h"
50
51int
52ev_token_bucket_init_(struct ev_token_bucket *bucket,
53    const struct ev_token_bucket_cfg *cfg,
54    ev_uint32_t current_tick,
55    int reinitialize)
56{
57	if (reinitialize) {
58		/* on reinitialization, we only clip downwards, since we've
59		   already used who-knows-how-much bandwidth this tick.  We
60		   leave "last_updated" as it is; the next update will add the
61		   appropriate amount of bandwidth to the bucket.
62		*/
63		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
64			bucket->read_limit = cfg->read_maximum;
65		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
66			bucket->write_limit = cfg->write_maximum;
67	} else {
68		bucket->read_limit = cfg->read_rate;
69		bucket->write_limit = cfg->write_rate;
70		bucket->last_updated = current_tick;
71	}
72	return 0;
73}
74
75int
76ev_token_bucket_update_(struct ev_token_bucket *bucket,
77    const struct ev_token_bucket_cfg *cfg,
78    ev_uint32_t current_tick)
79{
80	/* It's okay if the tick number overflows, since we'll just
81	 * wrap around when we do the unsigned substraction. */
82	unsigned n_ticks = current_tick - bucket->last_updated;
83
84	/* Make sure some ticks actually happened, and that time didn't
85	 * roll back. */
86	if (n_ticks == 0 || n_ticks > INT_MAX)
87		return 0;
88
89	/* Naively, we would say
90		bucket->limit += n_ticks * cfg->rate;
91
92		if (bucket->limit > cfg->maximum)
93			bucket->limit = cfg->maximum;
94
95	   But we're worried about overflow, so we do it like this:
96	*/
97
98	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
99		bucket->read_limit = cfg->read_maximum;
100	else
101		bucket->read_limit += n_ticks * cfg->read_rate;
102
103
104	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
105		bucket->write_limit = cfg->write_maximum;
106	else
107		bucket->write_limit += n_ticks * cfg->write_rate;
108
109
110	bucket->last_updated = current_tick;
111
112	return 1;
113}
114
115static inline void
116bufferevent_update_buckets(struct bufferevent_private *bev)
117{
118	/* Must hold lock on bev. */
119	struct timeval now;
120	unsigned tick;
121	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
122	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
123	if (tick != bev->rate_limiting->limit.last_updated)
124		ev_token_bucket_update_(&bev->rate_limiting->limit,
125		    bev->rate_limiting->cfg, tick);
126}
127
128ev_uint32_t
129ev_token_bucket_get_tick_(const struct timeval *tv,
130    const struct ev_token_bucket_cfg *cfg)
131{
132	/* This computation uses two multiplies and a divide.  We could do
133	 * fewer if we knew that the tick length was an integer number of
134	 * seconds, or if we knew it divided evenly into a second.  We should
135	 * investigate that more.
136	 */
137
138	/* We cast to an ev_uint64_t first, since we don't want to overflow
139	 * before we do the final divide. */
140	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
141	return (unsigned)(msec / cfg->msec_per_tick);
142}
143
144struct ev_token_bucket_cfg *
145ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
146    size_t write_rate, size_t write_burst,
147    const struct timeval *tick_len)
148{
149	struct ev_token_bucket_cfg *r;
150	struct timeval g;
151	if (! tick_len) {
152		g.tv_sec = 1;
153		g.tv_usec = 0;
154		tick_len = &g;
155	}
156	if (read_rate > read_burst || write_rate > write_burst ||
157	    read_rate < 1 || write_rate < 1)
158		return NULL;
159	if (read_rate > EV_RATE_LIMIT_MAX ||
160	    write_rate > EV_RATE_LIMIT_MAX ||
161	    read_burst > EV_RATE_LIMIT_MAX ||
162	    write_burst > EV_RATE_LIMIT_MAX)
163		return NULL;
164	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
165	if (!r)
166		return NULL;
167	r->read_rate = read_rate;
168	r->write_rate = write_rate;
169	r->read_maximum = read_burst;
170	r->write_maximum = write_burst;
171	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
172	r->msec_per_tick = (tick_len->tv_sec * 1000) +
173	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
174	return r;
175}
176
177void
178ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
179{
180	mm_free(cfg);
181}
182
183/* Default values for max_single_read & max_single_write variables. */
184#define MAX_SINGLE_READ_DEFAULT 16384
185#define MAX_SINGLE_WRITE_DEFAULT 16384
186
187#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189
190static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
191static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
192static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
193static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
194
195/** Helper: figure out the maximum amount we should write if is_write, or
196    the maximum amount we should read if is_read.  Return that maximum, or
197    0 if our bucket is wholly exhausted.
198 */
199static inline ev_ssize_t
200bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
201{
202	/* needs lock on bev. */
203	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
204
205#define LIM(x)						\
206	(is_write ? (x).write_limit : (x).read_limit)
207
208#define GROUP_SUSPENDED(g)			\
209	(is_write ? (g)->write_suspended : (g)->read_suspended)
210
211	/* Sets max_so_far to MIN(x, max_so_far) */
212#define CLAMPTO(x)				\
213	do {					\
214		if (max_so_far > (x))		\
215			max_so_far = (x);	\
216	} while (0);
217
218	if (!bev->rate_limiting)
219		return max_so_far;
220
221	/* If rate-limiting is enabled at all, update the appropriate
222	   bucket, and take the smaller of our rate limit and the group
223	   rate limit.
224	 */
225
226	if (bev->rate_limiting->cfg) {
227		bufferevent_update_buckets(bev);
228		max_so_far = LIM(bev->rate_limiting->limit);
229	}
230	if (bev->rate_limiting->group) {
231		struct bufferevent_rate_limit_group *g =
232		    bev->rate_limiting->group;
233		ev_ssize_t share;
234		LOCK_GROUP(g);
235		if (GROUP_SUSPENDED(g)) {
236			/* We can get here if we failed to lock this
237			 * particular bufferevent while suspending the whole
238			 * group. */
239			if (is_write)
240				bufferevent_suspend_write_(&bev->bev,
241				    BEV_SUSPEND_BW_GROUP);
242			else
243				bufferevent_suspend_read_(&bev->bev,
244				    BEV_SUSPEND_BW_GROUP);
245			share = 0;
246		} else {
247			/* XXXX probably we should divide among the active
248			 * members, not the total members. */
249			share = LIM(g->rate_limit) / g->n_members;
250			if (share < g->min_share)
251				share = g->min_share;
252		}
253		UNLOCK_GROUP(g);
254		CLAMPTO(share);
255	}
256
257	if (max_so_far < 0)
258		max_so_far = 0;
259	return max_so_far;
260}
261
262ev_ssize_t
263bufferevent_get_read_max_(struct bufferevent_private *bev)
264{
265	return bufferevent_get_rlim_max_(bev, 0);
266}
267
268ev_ssize_t
269bufferevent_get_write_max_(struct bufferevent_private *bev)
270{
271	return bufferevent_get_rlim_max_(bev, 1);
272}
273
274int
275bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
276{
277	/* XXXXX Make sure all users of this function check its return value */
278	int r = 0;
279	/* need to hold lock on bev */
280	if (!bev->rate_limiting)
281		return 0;
282
283	if (bev->rate_limiting->cfg) {
284		bev->rate_limiting->limit.read_limit -= bytes;
285		if (bev->rate_limiting->limit.read_limit <= 0) {
286			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
287			if (event_add(&bev->rate_limiting->refill_bucket_event,
288				&bev->rate_limiting->cfg->tick_timeout) < 0)
289				r = -1;
290		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
291			if (!(bev->write_suspended & BEV_SUSPEND_BW))
292				event_del(&bev->rate_limiting->refill_bucket_event);
293			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
294		}
295	}
296
297	if (bev->rate_limiting->group) {
298		LOCK_GROUP(bev->rate_limiting->group);
299		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300		bev->rate_limiting->group->total_read += bytes;
301		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302			bev_group_suspend_reading_(bev->rate_limiting->group);
303		} else if (bev->rate_limiting->group->read_suspended) {
304			bev_group_unsuspend_reading_(bev->rate_limiting->group);
305		}
306		UNLOCK_GROUP(bev->rate_limiting->group);
307	}
308
309	return r;
310}
311
312int
313bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
314{
315	/* XXXXX Make sure all users of this function check its return value */
316	int r = 0;
317	/* need to hold lock */
318	if (!bev->rate_limiting)
319		return 0;
320
321	if (bev->rate_limiting->cfg) {
322		bev->rate_limiting->limit.write_limit -= bytes;
323		if (bev->rate_limiting->limit.write_limit <= 0) {
324			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
325			if (event_add(&bev->rate_limiting->refill_bucket_event,
326				&bev->rate_limiting->cfg->tick_timeout) < 0)
327				r = -1;
328		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
329			if (!(bev->read_suspended & BEV_SUSPEND_BW))
330				event_del(&bev->rate_limiting->refill_bucket_event);
331			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
332		}
333	}
334
335	if (bev->rate_limiting->group) {
336		LOCK_GROUP(bev->rate_limiting->group);
337		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338		bev->rate_limiting->group->total_written += bytes;
339		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340			bev_group_suspend_writing_(bev->rate_limiting->group);
341		} else if (bev->rate_limiting->group->write_suspended) {
342			bev_group_unsuspend_writing_(bev->rate_limiting->group);
343		}
344		UNLOCK_GROUP(bev->rate_limiting->group);
345	}
346
347	return r;
348}
349
350/** Stop reading on every bufferevent in <b>g</b> */
351static int
352bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
353{
354	/* Needs group lock */
355	struct bufferevent_private *bev;
356	g->read_suspended = 1;
357	g->pending_unsuspend_read = 0;
358
359	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
360	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
361	   the bufferevent locks.  If we are unable to lock any individual
362	   bufferevent, it will find out later when it looks at its limit
363	   and sees that its group is suspended.)
364	*/
365	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366		if (EVLOCK_TRY_LOCK_(bev->lock)) {
367			bufferevent_suspend_read_(&bev->bev,
368			    BEV_SUSPEND_BW_GROUP);
369			EVLOCK_UNLOCK(bev->lock, 0);
370		}
371	}
372	return 0;
373}
374
375/** Stop writing on every bufferevent in <b>g</b> */
376static int
377bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
378{
379	/* Needs group lock */
380	struct bufferevent_private *bev;
381	g->write_suspended = 1;
382	g->pending_unsuspend_write = 0;
383	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384		if (EVLOCK_TRY_LOCK_(bev->lock)) {
385			bufferevent_suspend_write_(&bev->bev,
386			    BEV_SUSPEND_BW_GROUP);
387			EVLOCK_UNLOCK(bev->lock, 0);
388		}
389	}
390	return 0;
391}
392
393/** Timer callback invoked on a single bufferevent with one or more exhausted
394    buckets when they are ready to refill. */
395static void
396bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
397{
398	unsigned tick;
399	struct timeval now;
400	struct bufferevent_private *bev = arg;
401	int again = 0;
402	BEV_LOCK(&bev->bev);
403	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404		BEV_UNLOCK(&bev->bev);
405		return;
406	}
407
408	/* First, update the bucket */
409	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410	tick = ev_token_bucket_get_tick_(&now,
411	    bev->rate_limiting->cfg);
412	ev_token_bucket_update_(&bev->rate_limiting->limit,
413	    bev->rate_limiting->cfg,
414	    tick);
415
416	/* Now unsuspend any read/write operations as appropriate. */
417	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418		if (bev->rate_limiting->limit.read_limit > 0)
419			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
420		else
421			again = 1;
422	}
423	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424		if (bev->rate_limiting->limit.write_limit > 0)
425			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
426		else
427			again = 1;
428	}
429	if (again) {
430		/* One or more of the buckets may need another refill if they
431		   started negative.
432
433		   XXXX if we need to be quiet for more ticks, we should
434		   maybe figure out what timeout we really want.
435		*/
436		/* XXXX Handle event_add failure somehow */
437		event_add(&bev->rate_limiting->refill_bucket_event,
438		    &bev->rate_limiting->cfg->tick_timeout);
439	}
440	BEV_UNLOCK(&bev->bev);
441}
442
443/** Helper: grab a random element from a bufferevent group.
444 *
445 * Requires that we hold the lock on the group.
446 */
447static struct bufferevent_private *
448bev_group_random_element_(struct bufferevent_rate_limit_group *group)
449{
450	int which;
451	struct bufferevent_private *bev;
452
453	/* requires group lock */
454
455	if (!group->n_members)
456		return NULL;
457
458	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
459
460	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
461
462	bev = LIST_FIRST(&group->members);
463	while (which--)
464		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
465
466	return bev;
467}
468
469/** Iterate over the elements of a rate-limiting group 'g' with a random
470    starting point, assigning each to the variable 'bev', and executing the
471    block 'block'.
472
473    We do this in a half-baked effort to get fairness among group members.
474    XXX Round-robin or some kind of priority queue would be even more fair.
475 */
476#define FOREACH_RANDOM_ORDER(block)			\
477	do {						\
478		first = bev_group_random_element_(g);	\
479		for (bev = first; bev != LIST_END(&g->members); \
480		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
481			block ;					 \
482		}						 \
483		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
484		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
485			block ;						\
486		}							\
487	} while (0)
488
489static void
490bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
491{
492	int again = 0;
493	struct bufferevent_private *bev, *first;
494
495	g->read_suspended = 0;
496	FOREACH_RANDOM_ORDER({
497		if (EVLOCK_TRY_LOCK_(bev->lock)) {
498			bufferevent_unsuspend_read_(&bev->bev,
499			    BEV_SUSPEND_BW_GROUP);
500			EVLOCK_UNLOCK(bev->lock, 0);
501		} else {
502			again = 1;
503		}
504	});
505	g->pending_unsuspend_read = again;
506}
507
508static void
509bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
510{
511	int again = 0;
512	struct bufferevent_private *bev, *first;
513	g->write_suspended = 0;
514
515	FOREACH_RANDOM_ORDER({
516		if (EVLOCK_TRY_LOCK_(bev->lock)) {
517			bufferevent_unsuspend_write_(&bev->bev,
518			    BEV_SUSPEND_BW_GROUP);
519			EVLOCK_UNLOCK(bev->lock, 0);
520		} else {
521			again = 1;
522		}
523	});
524	g->pending_unsuspend_write = again;
525}
526
527/** Callback invoked every tick to add more elements to the group bucket
528    and unsuspend group members as needed.
529 */
530static void
531bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
532{
533	struct bufferevent_rate_limit_group *g = arg;
534	unsigned tick;
535	struct timeval now;
536
537	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
538
539	LOCK_GROUP(g);
540
541	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
542	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
543
544	if (g->pending_unsuspend_read ||
545	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
546		bev_group_unsuspend_reading_(g);
547	}
548	if (g->pending_unsuspend_write ||
549	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
550		bev_group_unsuspend_writing_(g);
551	}
552
553	/* XXXX Rather than waiting to the next tick to unsuspend stuff
554	 * with pending_unsuspend_write/read, we should do it on the
555	 * next iteration of the mainloop.
556	 */
557
558	UNLOCK_GROUP(g);
559}
560
561int
562bufferevent_set_rate_limit(struct bufferevent *bev,
563    struct ev_token_bucket_cfg *cfg)
564{
565	struct bufferevent_private *bevp = BEV_UPCAST(bev);
566	int r = -1;
567	struct bufferevent_rate_limit *rlim;
568	struct timeval now;
569	ev_uint32_t tick;
570	int reinit = 0, suspended = 0;
571	/* XXX reference-count cfg */
572
573	BEV_LOCK(bev);
574
575	if (cfg == NULL) {
576		if (bevp->rate_limiting) {
577			rlim = bevp->rate_limiting;
578			rlim->cfg = NULL;
579			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
580			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
581			if (event_initialized(&rlim->refill_bucket_event))
582				event_del(&rlim->refill_bucket_event);
583		}
584		r = 0;
585		goto done;
586	}
587
588	event_base_gettimeofday_cached(bev->ev_base, &now);
589	tick = ev_token_bucket_get_tick_(&now, cfg);
590
591	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
592		/* no-op */
593		r = 0;
594		goto done;
595	}
596	if (bevp->rate_limiting == NULL) {
597		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
598		if (!rlim)
599			goto done;
600		bevp->rate_limiting = rlim;
601	} else {
602		rlim = bevp->rate_limiting;
603	}
604	reinit = rlim->cfg != NULL;
605
606	rlim->cfg = cfg;
607	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
608
609	if (reinit) {
610		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
611		event_del(&rlim->refill_bucket_event);
612	}
613	event_assign(&rlim->refill_bucket_event, bev->ev_base,
614	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
615
616	if (rlim->limit.read_limit > 0) {
617		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
618	} else {
619		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
620		suspended=1;
621	}
622	if (rlim->limit.write_limit > 0) {
623		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
624	} else {
625		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
626		suspended = 1;
627	}
628
629	if (suspended)
630		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
631
632	r = 0;
633
634done:
635	BEV_UNLOCK(bev);
636	return r;
637}
638
639struct bufferevent_rate_limit_group *
640bufferevent_rate_limit_group_new(struct event_base *base,
641    const struct ev_token_bucket_cfg *cfg)
642{
643	struct bufferevent_rate_limit_group *g;
644	struct timeval now;
645	ev_uint32_t tick;
646
647	event_base_gettimeofday_cached(base, &now);
648	tick = ev_token_bucket_get_tick_(&now, cfg);
649
650	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
651	if (!g)
652		return NULL;
653	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
654	LIST_INIT(&g->members);
655
656	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
657
658	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
659	    bev_group_refill_callback_, g);
660	/*XXXX handle event_add failure */
661	event_add(&g->master_refill_event, &cfg->tick_timeout);
662
663	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
664
665	bufferevent_rate_limit_group_set_min_share(g, 64);
666
667	evutil_weakrand_seed_(&g->weakrand_seed,
668	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
669
670	return g;
671}
672
673int
674bufferevent_rate_limit_group_set_cfg(
675	struct bufferevent_rate_limit_group *g,
676	const struct ev_token_bucket_cfg *cfg)
677{
678	int same_tick;
679	if (!g || !cfg)
680		return -1;
681
682	LOCK_GROUP(g);
683	same_tick = evutil_timercmp(
684		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
685	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
686
687	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
688		g->rate_limit.read_limit = cfg->read_maximum;
689	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
690		g->rate_limit.write_limit = cfg->write_maximum;
691
692	if (!same_tick) {
693		/* This can cause a hiccup in the schedule */
694		event_add(&g->master_refill_event, &cfg->tick_timeout);
695	}
696
697	/* The new limits might force us to adjust min_share differently. */
698	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
699
700	UNLOCK_GROUP(g);
701	return 0;
702}
703
704int
705bufferevent_rate_limit_group_set_min_share(
706	struct bufferevent_rate_limit_group *g,
707	size_t share)
708{
709	if (share > EV_SSIZE_MAX)
710		return -1;
711
712	g->configured_min_share = share;
713
714	/* Can't set share to less than the one-tick maximum.  IOW, at steady
715	 * state, at least one connection can go per tick. */
716	if (share > g->rate_limit_cfg.read_rate)
717		share = g->rate_limit_cfg.read_rate;
718	if (share > g->rate_limit_cfg.write_rate)
719		share = g->rate_limit_cfg.write_rate;
720
721	g->min_share = share;
722	return 0;
723}
724
725void
726bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
727{
728	LOCK_GROUP(g);
729	EVUTIL_ASSERT(0 == g->n_members);
730	event_del(&g->master_refill_event);
731	UNLOCK_GROUP(g);
732	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
733	mm_free(g);
734}
735
736int
737bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
738    struct bufferevent_rate_limit_group *g)
739{
740	int wsuspend, rsuspend;
741	struct bufferevent_private *bevp = BEV_UPCAST(bev);
742	BEV_LOCK(bev);
743
744	if (!bevp->rate_limiting) {
745		struct bufferevent_rate_limit *rlim;
746		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
747		if (!rlim) {
748			BEV_UNLOCK(bev);
749			return -1;
750		}
751		event_assign(&rlim->refill_bucket_event, bev->ev_base,
752		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
753		bevp->rate_limiting = rlim;
754	}
755
756	if (bevp->rate_limiting->group == g) {
757		BEV_UNLOCK(bev);
758		return 0;
759	}
760	if (bevp->rate_limiting->group)
761		bufferevent_remove_from_rate_limit_group(bev);
762
763	LOCK_GROUP(g);
764	bevp->rate_limiting->group = g;
765	++g->n_members;
766	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
767
768	rsuspend = g->read_suspended;
769	wsuspend = g->write_suspended;
770
771	UNLOCK_GROUP(g);
772
773	if (rsuspend)
774		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
775	if (wsuspend)
776		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
777
778	BEV_UNLOCK(bev);
779	return 0;
780}
781
782int
783bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
784{
785	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
786}
787
788int
789bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790    int unsuspend)
791{
792	struct bufferevent_private *bevp = BEV_UPCAST(bev);
793	BEV_LOCK(bev);
794	if (bevp->rate_limiting && bevp->rate_limiting->group) {
795		struct bufferevent_rate_limit_group *g =
796		    bevp->rate_limiting->group;
797		LOCK_GROUP(g);
798		bevp->rate_limiting->group = NULL;
799		--g->n_members;
800		LIST_REMOVE(bevp, rate_limiting->next_in_group);
801		UNLOCK_GROUP(g);
802	}
803	if (unsuspend) {
804		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
805		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
806	}
807	BEV_UNLOCK(bev);
808	return 0;
809}
810
811/* ===
812 * API functions to expose rate limits.
813 *
814 * Don't use these from inside Libevent; they're meant to be for use by
815 * the program.
816 * === */
817
818/* Mostly you don't want to use this function from inside libevent;
819 * bufferevent_get_read_max_() is more likely what you want*/
820ev_ssize_t
821bufferevent_get_read_limit(struct bufferevent *bev)
822{
823	ev_ssize_t r;
824	struct bufferevent_private *bevp;
825	BEV_LOCK(bev);
826	bevp = BEV_UPCAST(bev);
827	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
828		bufferevent_update_buckets(bevp);
829		r = bevp->rate_limiting->limit.read_limit;
830	} else {
831		r = EV_SSIZE_MAX;
832	}
833	BEV_UNLOCK(bev);
834	return r;
835}
836
837/* Mostly you don't want to use this function from inside libevent;
838 * bufferevent_get_write_max_() is more likely what you want*/
839ev_ssize_t
840bufferevent_get_write_limit(struct bufferevent *bev)
841{
842	ev_ssize_t r;
843	struct bufferevent_private *bevp;
844	BEV_LOCK(bev);
845	bevp = BEV_UPCAST(bev);
846	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
847		bufferevent_update_buckets(bevp);
848		r = bevp->rate_limiting->limit.write_limit;
849	} else {
850		r = EV_SSIZE_MAX;
851	}
852	BEV_UNLOCK(bev);
853	return r;
854}
855
856int
857bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
858{
859	struct bufferevent_private *bevp;
860	BEV_LOCK(bev);
861	bevp = BEV_UPCAST(bev);
862	if (size == 0 || size > EV_SSIZE_MAX)
863		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
864	else
865		bevp->max_single_read = size;
866	BEV_UNLOCK(bev);
867	return 0;
868}
869
870int
871bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
872{
873	struct bufferevent_private *bevp;
874	BEV_LOCK(bev);
875	bevp = BEV_UPCAST(bev);
876	if (size == 0 || size > EV_SSIZE_MAX)
877		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
878	else
879		bevp->max_single_write = size;
880	BEV_UNLOCK(bev);
881	return 0;
882}
883
884ev_ssize_t
885bufferevent_get_max_single_read(struct bufferevent *bev)
886{
887	ev_ssize_t r;
888
889	BEV_LOCK(bev);
890	r = BEV_UPCAST(bev)->max_single_read;
891	BEV_UNLOCK(bev);
892	return r;
893}
894
895ev_ssize_t
896bufferevent_get_max_single_write(struct bufferevent *bev)
897{
898	ev_ssize_t r;
899
900	BEV_LOCK(bev);
901	r = BEV_UPCAST(bev)->max_single_write;
902	BEV_UNLOCK(bev);
903	return r;
904}
905
906ev_ssize_t
907bufferevent_get_max_to_read(struct bufferevent *bev)
908{
909	ev_ssize_t r;
910	BEV_LOCK(bev);
911	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
912	BEV_UNLOCK(bev);
913	return r;
914}
915
916ev_ssize_t
917bufferevent_get_max_to_write(struct bufferevent *bev)
918{
919	ev_ssize_t r;
920	BEV_LOCK(bev);
921	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
922	BEV_UNLOCK(bev);
923	return r;
924}
925
926const struct ev_token_bucket_cfg *
927bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
928	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
929	struct ev_token_bucket_cfg *cfg;
930
931	BEV_LOCK(bev);
932
933	if (bufev_private->rate_limiting) {
934		cfg = bufev_private->rate_limiting->cfg;
935	} else {
936		cfg = NULL;
937	}
938
939	BEV_UNLOCK(bev);
940
941	return cfg;
942}
943
944/* Mostly you don't want to use this function from inside libevent;
945 * bufferevent_get_read_max_() is more likely what you want*/
946ev_ssize_t
947bufferevent_rate_limit_group_get_read_limit(
948	struct bufferevent_rate_limit_group *grp)
949{
950	ev_ssize_t r;
951	LOCK_GROUP(grp);
952	r = grp->rate_limit.read_limit;
953	UNLOCK_GROUP(grp);
954	return r;
955}
956
957/* Mostly you don't want to use this function from inside libevent;
958 * bufferevent_get_write_max_() is more likely what you want. */
959ev_ssize_t
960bufferevent_rate_limit_group_get_write_limit(
961	struct bufferevent_rate_limit_group *grp)
962{
963	ev_ssize_t r;
964	LOCK_GROUP(grp);
965	r = grp->rate_limit.write_limit;
966	UNLOCK_GROUP(grp);
967	return r;
968}
969
970int
971bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
972{
973	int r = 0;
974	ev_ssize_t old_limit, new_limit;
975	struct bufferevent_private *bevp;
976	BEV_LOCK(bev);
977	bevp = BEV_UPCAST(bev);
978	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
979	old_limit = bevp->rate_limiting->limit.read_limit;
980
981	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
982	if (old_limit > 0 && new_limit <= 0) {
983		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
984		if (event_add(&bevp->rate_limiting->refill_bucket_event,
985			&bevp->rate_limiting->cfg->tick_timeout) < 0)
986			r = -1;
987	} else if (old_limit <= 0 && new_limit > 0) {
988		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
989			event_del(&bevp->rate_limiting->refill_bucket_event);
990		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
991	}
992
993	BEV_UNLOCK(bev);
994	return r;
995}
996
997int
998bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
999{
1000	/* XXXX this is mostly copy-and-paste from
1001	 * bufferevent_decrement_read_limit */
1002	int r = 0;
1003	ev_ssize_t old_limit, new_limit;
1004	struct bufferevent_private *bevp;
1005	BEV_LOCK(bev);
1006	bevp = BEV_UPCAST(bev);
1007	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1008	old_limit = bevp->rate_limiting->limit.write_limit;
1009
1010	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1011	if (old_limit > 0 && new_limit <= 0) {
1012		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1013		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1014			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1015			r = -1;
1016	} else if (old_limit <= 0 && new_limit > 0) {
1017		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1018			event_del(&bevp->rate_limiting->refill_bucket_event);
1019		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1020	}
1021
1022	BEV_UNLOCK(bev);
1023	return r;
1024}
1025
1026int
1027bufferevent_rate_limit_group_decrement_read(
1028	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1029{
1030	int r = 0;
1031	ev_ssize_t old_limit, new_limit;
1032	LOCK_GROUP(grp);
1033	old_limit = grp->rate_limit.read_limit;
1034	new_limit = (grp->rate_limit.read_limit -= decr);
1035
1036	if (old_limit > 0 && new_limit <= 0) {
1037		bev_group_suspend_reading_(grp);
1038	} else if (old_limit <= 0 && new_limit > 0) {
1039		bev_group_unsuspend_reading_(grp);
1040	}
1041
1042	UNLOCK_GROUP(grp);
1043	return r;
1044}
1045
1046int
1047bufferevent_rate_limit_group_decrement_write(
1048	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1049{
1050	int r = 0;
1051	ev_ssize_t old_limit, new_limit;
1052	LOCK_GROUP(grp);
1053	old_limit = grp->rate_limit.write_limit;
1054	new_limit = (grp->rate_limit.write_limit -= decr);
1055
1056	if (old_limit > 0 && new_limit <= 0) {
1057		bev_group_suspend_writing_(grp);
1058	} else if (old_limit <= 0 && new_limit > 0) {
1059		bev_group_unsuspend_writing_(grp);
1060	}
1061
1062	UNLOCK_GROUP(grp);
1063	return r;
1064}
1065
1066void
1067bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1068    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1069{
1070	EVUTIL_ASSERT(grp != NULL);
1071	if (total_read_out)
1072		*total_read_out = grp->total_read;
1073	if (total_written_out)
1074		*total_written_out = grp->total_written;
1075}
1076
1077void
1078bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1079{
1080	grp->total_read = grp->total_written = 0;
1081}
1082
1083int
1084bufferevent_ratelim_init_(struct bufferevent_private *bev)
1085{
1086	bev->rate_limiting = NULL;
1087	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1088	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1089
1090	return 0;
1091}
1092