1/*	$NetBSD: bufferevent_ratelim.c,v 1.5 2020/05/25 20:47:33 christos Exp $	*/
2
3/*
4 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 * 3. The name of the author may not be used to endorse or promote products
17 *    derived from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30#include "evconfig-private.h"
31
32#include <sys/types.h>
33#include <limits.h>
34#include <string.h>
35#include <stdlib.h>
36
37#include "event2/event.h"
38#include "event2/event_struct.h"
39#include "event2/util.h"
40#include "event2/bufferevent.h"
41#include "event2/bufferevent_struct.h"
42#include "event2/buffer.h"
43
44#include "ratelim-internal.h"
45
46#include "bufferevent-internal.h"
47#include "mm-internal.h"
48#include "util-internal.h"
49#include "event-internal.h"
50
51int
52ev_token_bucket_init_(struct ev_token_bucket *bucket,
53    const struct ev_token_bucket_cfg *cfg,
54    ev_uint32_t current_tick,
55    int reinitialize)
56{
57	if (reinitialize) {
58		/* on reinitialization, we only clip downwards, since we've
59		   already used who-knows-how-much bandwidth this tick.  We
60		   leave "last_updated" as it is; the next update will add the
61		   appropriate amount of bandwidth to the bucket.
62		*/
63		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
64			bucket->read_limit = cfg->read_maximum;
65		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
66			bucket->write_limit = cfg->write_maximum;
67	} else {
68		bucket->read_limit = cfg->read_rate;
69		bucket->write_limit = cfg->write_rate;
70		bucket->last_updated = current_tick;
71	}
72	return 0;
73}
74
75int
76ev_token_bucket_update_(struct ev_token_bucket *bucket,
77    const struct ev_token_bucket_cfg *cfg,
78    ev_uint32_t current_tick)
79{
80	/* It's okay if the tick number overflows, since we'll just
81	 * wrap around when we do the unsigned substraction. */
82	unsigned n_ticks = current_tick - bucket->last_updated;
83
84	/* Make sure some ticks actually happened, and that time didn't
85	 * roll back. */
86	if (n_ticks == 0 || n_ticks > INT_MAX)
87		return 0;
88
89	/* Naively, we would say
90		bucket->limit += n_ticks * cfg->rate;
91
92		if (bucket->limit > cfg->maximum)
93			bucket->limit = cfg->maximum;
94
95	   But we're worried about overflow, so we do it like this:
96	*/
97
98	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
99		bucket->read_limit = cfg->read_maximum;
100	else
101		bucket->read_limit += n_ticks * cfg->read_rate;
102
103
104	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
105		bucket->write_limit = cfg->write_maximum;
106	else
107		bucket->write_limit += n_ticks * cfg->write_rate;
108
109
110	bucket->last_updated = current_tick;
111
112	return 1;
113}
114
115static inline void
116bufferevent_update_buckets(struct bufferevent_private *bev)
117{
118	/* Must hold lock on bev. */
119	struct timeval now;
120	unsigned tick;
121	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
122	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
123	if (tick != bev->rate_limiting->limit.last_updated)
124		ev_token_bucket_update_(&bev->rate_limiting->limit,
125		    bev->rate_limiting->cfg, tick);
126}
127
128ev_uint32_t
129ev_token_bucket_get_tick_(const struct timeval *tv,
130    const struct ev_token_bucket_cfg *cfg)
131{
132	/* This computation uses two multiplies and a divide.  We could do
133	 * fewer if we knew that the tick length was an integer number of
134	 * seconds, or if we knew it divided evenly into a second.  We should
135	 * investigate that more.
136	 */
137
138	/* We cast to an ev_uint64_t first, since we don't want to overflow
139	 * before we do the final divide. */
140	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
141	return (unsigned)(msec / cfg->msec_per_tick);
142}
143
144struct ev_token_bucket_cfg *
145ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
146    size_t write_rate, size_t write_burst,
147    const struct timeval *tick_len)
148{
149	struct ev_token_bucket_cfg *r;
150	struct timeval g;
151	if (! tick_len) {
152		g.tv_sec = 1;
153		g.tv_usec = 0;
154		tick_len = &g;
155	}
156	if (read_rate > read_burst || write_rate > write_burst ||
157	    read_rate < 1 || write_rate < 1)
158		return NULL;
159	if (read_rate > EV_RATE_LIMIT_MAX ||
160	    write_rate > EV_RATE_LIMIT_MAX ||
161	    read_burst > EV_RATE_LIMIT_MAX ||
162	    write_burst > EV_RATE_LIMIT_MAX)
163		return NULL;
164	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
165	if (!r)
166		return NULL;
167	r->read_rate = read_rate;
168	r->write_rate = write_rate;
169	r->read_maximum = read_burst;
170	r->write_maximum = write_burst;
171	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
172	r->msec_per_tick = (tick_len->tv_sec * 1000) +
173	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
174	return r;
175}
176
177void
178ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
179{
180	mm_free(cfg);
181}
182
183/* Default values for max_single_read & max_single_write variables. */
184#define MAX_SINGLE_READ_DEFAULT 16384
185#define MAX_SINGLE_WRITE_DEFAULT 16384
186
187#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189
190static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
191static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
192static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
193static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
194
195/** Helper: figure out the maximum amount we should write if is_write, or
196    the maximum amount we should read if is_read.  Return that maximum, or
197    0 if our bucket is wholly exhausted.
198 */
199static inline ev_ssize_t
200bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
201{
202	/* needs lock on bev. */
203	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
204
205#define LIM(x)						\
206	(is_write ? (x).write_limit : (x).read_limit)
207
208#define GROUP_SUSPENDED(g)			\
209	(is_write ? (g)->write_suspended : (g)->read_suspended)
210
211	/* Sets max_so_far to MIN(x, max_so_far) */
212#define CLAMPTO(x)				\
213	do {					\
214		if (max_so_far > (x))		\
215			max_so_far = (x);	\
216	} while (0);
217
218	if (!bev->rate_limiting)
219		return max_so_far;
220
221	/* If rate-limiting is enabled at all, update the appropriate
222	   bucket, and take the smaller of our rate limit and the group
223	   rate limit.
224	 */
225
226	if (bev->rate_limiting->cfg) {
227		bufferevent_update_buckets(bev);
228		max_so_far = LIM(bev->rate_limiting->limit);
229	}
230	if (bev->rate_limiting->group) {
231		struct bufferevent_rate_limit_group *g =
232		    bev->rate_limiting->group;
233		ev_ssize_t share;
234		LOCK_GROUP(g);
235		if (GROUP_SUSPENDED(g)) {
236			/* We can get here if we failed to lock this
237			 * particular bufferevent while suspending the whole
238			 * group. */
239			if (is_write)
240				bufferevent_suspend_write_(&bev->bev,
241				    BEV_SUSPEND_BW_GROUP);
242			else
243				bufferevent_suspend_read_(&bev->bev,
244				    BEV_SUSPEND_BW_GROUP);
245			share = 0;
246		} else {
247			/* XXXX probably we should divide among the active
248			 * members, not the total members. */
249			share = LIM(g->rate_limit) / g->n_members;
250			if (share < g->min_share)
251				share = g->min_share;
252		}
253		UNLOCK_GROUP(g);
254		CLAMPTO(share);
255	}
256
257	if (max_so_far < 0)
258		max_so_far = 0;
259	return max_so_far;
260}
261
262ev_ssize_t
263bufferevent_get_read_max_(struct bufferevent_private *bev)
264{
265	return bufferevent_get_rlim_max_(bev, 0);
266}
267
268ev_ssize_t
269bufferevent_get_write_max_(struct bufferevent_private *bev)
270{
271	return bufferevent_get_rlim_max_(bev, 1);
272}
273
274int
275bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
276{
277	/* XXXXX Make sure all users of this function check its return value */
278	int r = 0;
279	/* need to hold lock on bev */
280	if (!bev->rate_limiting)
281		return 0;
282
283	if (bev->rate_limiting->cfg) {
284		bev->rate_limiting->limit.read_limit -= bytes;
285		if (bev->rate_limiting->limit.read_limit <= 0) {
286			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
287			if (event_add(&bev->rate_limiting->refill_bucket_event,
288				&bev->rate_limiting->cfg->tick_timeout) < 0)
289				r = -1;
290		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
291			if (!(bev->write_suspended & BEV_SUSPEND_BW))
292				event_del(&bev->rate_limiting->refill_bucket_event);
293			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
294		}
295	}
296
297	if (bev->rate_limiting->group) {
298		LOCK_GROUP(bev->rate_limiting->group);
299		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300		bev->rate_limiting->group->total_read += bytes;
301		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302			bev_group_suspend_reading_(bev->rate_limiting->group);
303		} else if (bev->rate_limiting->group->read_suspended) {
304			bev_group_unsuspend_reading_(bev->rate_limiting->group);
305		}
306		UNLOCK_GROUP(bev->rate_limiting->group);
307	}
308
309	return r;
310}
311
312int
313bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
314{
315	/* XXXXX Make sure all users of this function check its return value */
316	int r = 0;
317	/* need to hold lock */
318	if (!bev->rate_limiting)
319		return 0;
320
321	if (bev->rate_limiting->cfg) {
322		bev->rate_limiting->limit.write_limit -= bytes;
323		if (bev->rate_limiting->limit.write_limit <= 0) {
324			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
325			if (event_add(&bev->rate_limiting->refill_bucket_event,
326				&bev->rate_limiting->cfg->tick_timeout) < 0)
327				r = -1;
328		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
329			if (!(bev->read_suspended & BEV_SUSPEND_BW))
330				event_del(&bev->rate_limiting->refill_bucket_event);
331			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
332		}
333	}
334
335	if (bev->rate_limiting->group) {
336		LOCK_GROUP(bev->rate_limiting->group);
337		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338		bev->rate_limiting->group->total_written += bytes;
339		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340			bev_group_suspend_writing_(bev->rate_limiting->group);
341		} else if (bev->rate_limiting->group->write_suspended) {
342			bev_group_unsuspend_writing_(bev->rate_limiting->group);
343		}
344		UNLOCK_GROUP(bev->rate_limiting->group);
345	}
346
347	return r;
348}
349
350/** Stop reading on every bufferevent in <b>g</b> */
351static int
352bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
353{
354	/* Needs group lock */
355	struct bufferevent_private *bev;
356	g->read_suspended = 1;
357	g->pending_unsuspend_read = 0;
358
359	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
360	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
361	   the bufferevent locks.  If we are unable to lock any individual
362	   bufferevent, it will find out later when it looks at its limit
363	   and sees that its group is suspended.)
364	*/
365	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366		if (EVLOCK_TRY_LOCK_(bev->lock)) {
367			bufferevent_suspend_read_(&bev->bev,
368			    BEV_SUSPEND_BW_GROUP);
369			EVLOCK_UNLOCK(bev->lock, 0);
370		}
371	}
372	return 0;
373}
374
375/** Stop writing on every bufferevent in <b>g</b> */
376static int
377bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
378{
379	/* Needs group lock */
380	struct bufferevent_private *bev;
381	g->write_suspended = 1;
382	g->pending_unsuspend_write = 0;
383	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384		if (EVLOCK_TRY_LOCK_(bev->lock)) {
385			bufferevent_suspend_write_(&bev->bev,
386			    BEV_SUSPEND_BW_GROUP);
387			EVLOCK_UNLOCK(bev->lock, 0);
388		}
389	}
390	return 0;
391}
392
393/** Timer callback invoked on a single bufferevent with one or more exhausted
394    buckets when they are ready to refill. */
395static void
396bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
397{
398	unsigned tick;
399	struct timeval now;
400	struct bufferevent_private *bev = arg;
401	int again = 0;
402	BEV_LOCK(&bev->bev);
403	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404		BEV_UNLOCK(&bev->bev);
405		return;
406	}
407
408	/* First, update the bucket */
409	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410	tick = ev_token_bucket_get_tick_(&now,
411	    bev->rate_limiting->cfg);
412	ev_token_bucket_update_(&bev->rate_limiting->limit,
413	    bev->rate_limiting->cfg,
414	    tick);
415
416	/* Now unsuspend any read/write operations as appropriate. */
417	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418		if (bev->rate_limiting->limit.read_limit > 0)
419			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
420		else
421			again = 1;
422	}
423	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424		if (bev->rate_limiting->limit.write_limit > 0)
425			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
426		else
427			again = 1;
428	}
429	if (again) {
430		/* One or more of the buckets may need another refill if they
431		   started negative.
432
433		   XXXX if we need to be quiet for more ticks, we should
434		   maybe figure out what timeout we really want.
435		*/
436		/* XXXX Handle event_add failure somehow */
437		event_add(&bev->rate_limiting->refill_bucket_event,
438		    &bev->rate_limiting->cfg->tick_timeout);
439	}
440	BEV_UNLOCK(&bev->bev);
441}
442
443/** Helper: grab a random element from a bufferevent group.
444 *
445 * Requires that we hold the lock on the group.
446 */
447static struct bufferevent_private *
448bev_group_random_element_(struct bufferevent_rate_limit_group *group)
449{
450	int which;
451	struct bufferevent_private *bev;
452
453	/* requires group lock */
454
455	if (!group->n_members)
456		return NULL;
457
458	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
459
460	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
461
462	bev = LIST_FIRST(&group->members);
463	while (which--)
464		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
465
466	return bev;
467}
468
469/** Iterate over the elements of a rate-limiting group 'g' with a random
470    starting point, assigning each to the variable 'bev', and executing the
471    block 'block'.
472
473    We do this in a half-baked effort to get fairness among group members.
474    XXX Round-robin or some kind of priority queue would be even more fair.
475 */
476#define FOREACH_RANDOM_ORDER(block)			\
477	do {						\
478		first = bev_group_random_element_(g);	\
479		for (bev = first; bev != LIST_END(&g->members); \
480		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
481			block ;					 \
482		}						 \
483		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
484		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
485			block ;						\
486		}							\
487	} while (0)
488
489static void
490bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
491{
492	int again = 0;
493	struct bufferevent_private *bev, *first;
494
495	g->read_suspended = 0;
496	FOREACH_RANDOM_ORDER({
497		if (EVLOCK_TRY_LOCK_(bev->lock)) {
498			bufferevent_unsuspend_read_(&bev->bev,
499			    BEV_SUSPEND_BW_GROUP);
500			EVLOCK_UNLOCK(bev->lock, 0);
501		} else {
502			again = 1;
503		}
504	});
505	g->pending_unsuspend_read = again;
506}
507
508static void
509bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
510{
511	int again = 0;
512	struct bufferevent_private *bev, *first;
513	g->write_suspended = 0;
514
515	FOREACH_RANDOM_ORDER({
516		if (EVLOCK_TRY_LOCK_(bev->lock)) {
517			bufferevent_unsuspend_write_(&bev->bev,
518			    BEV_SUSPEND_BW_GROUP);
519			EVLOCK_UNLOCK(bev->lock, 0);
520		} else {
521			again = 1;
522		}
523	});
524	g->pending_unsuspend_write = again;
525}
526
527/** Callback invoked every tick to add more elements to the group bucket
528    and unsuspend group members as needed.
529 */
530static void
531bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
532{
533	struct bufferevent_rate_limit_group *g = arg;
534	unsigned tick;
535	struct timeval now;
536
537	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
538
539	LOCK_GROUP(g);
540
541	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
542	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
543
544	if (g->pending_unsuspend_read ||
545	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
546		bev_group_unsuspend_reading_(g);
547	}
548	if (g->pending_unsuspend_write ||
549	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
550		bev_group_unsuspend_writing_(g);
551	}
552
553	/* XXXX Rather than waiting to the next tick to unsuspend stuff
554	 * with pending_unsuspend_write/read, we should do it on the
555	 * next iteration of the mainloop.
556	 */
557
558	UNLOCK_GROUP(g);
559}
560
561int
562bufferevent_set_rate_limit(struct bufferevent *bev,
563    struct ev_token_bucket_cfg *cfg)
564{
565	struct bufferevent_private *bevp =
566	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
567	int r = -1;
568	struct bufferevent_rate_limit *rlim;
569	struct timeval now;
570	ev_uint32_t tick;
571	int reinit = 0, suspended = 0;
572	/* XXX reference-count cfg */
573
574	BEV_LOCK(bev);
575
576	if (cfg == NULL) {
577		if (bevp->rate_limiting) {
578			rlim = bevp->rate_limiting;
579			rlim->cfg = NULL;
580			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
581			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
582			if (event_initialized(&rlim->refill_bucket_event))
583				event_del(&rlim->refill_bucket_event);
584		}
585		r = 0;
586		goto done;
587	}
588
589	event_base_gettimeofday_cached(bev->ev_base, &now);
590	tick = ev_token_bucket_get_tick_(&now, cfg);
591
592	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
593		/* no-op */
594		r = 0;
595		goto done;
596	}
597	if (bevp->rate_limiting == NULL) {
598		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
599		if (!rlim)
600			goto done;
601		bevp->rate_limiting = rlim;
602	} else {
603		rlim = bevp->rate_limiting;
604	}
605	reinit = rlim->cfg != NULL;
606
607	rlim->cfg = cfg;
608	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
609
610	if (reinit) {
611		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
612		event_del(&rlim->refill_bucket_event);
613	}
614	event_assign(&rlim->refill_bucket_event, bev->ev_base,
615	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
616
617	if (rlim->limit.read_limit > 0) {
618		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
619	} else {
620		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
621		suspended=1;
622	}
623	if (rlim->limit.write_limit > 0) {
624		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
625	} else {
626		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
627		suspended = 1;
628	}
629
630	if (suspended)
631		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
632
633	r = 0;
634
635done:
636	BEV_UNLOCK(bev);
637	return r;
638}
639
640struct bufferevent_rate_limit_group *
641bufferevent_rate_limit_group_new(struct event_base *base,
642    const struct ev_token_bucket_cfg *cfg)
643{
644	struct bufferevent_rate_limit_group *g;
645	struct timeval now;
646	ev_uint32_t tick;
647
648	event_base_gettimeofday_cached(base, &now);
649	tick = ev_token_bucket_get_tick_(&now, cfg);
650
651	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
652	if (!g)
653		return NULL;
654	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
655	LIST_INIT(&g->members);
656
657	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
658
659	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
660	    bev_group_refill_callback_, g);
661	/*XXXX handle event_add failure */
662	event_add(&g->master_refill_event, &cfg->tick_timeout);
663
664	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
665
666	bufferevent_rate_limit_group_set_min_share(g, 64);
667
668	evutil_weakrand_seed_(&g->weakrand_seed,
669	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
670
671	return g;
672}
673
674int
675bufferevent_rate_limit_group_set_cfg(
676	struct bufferevent_rate_limit_group *g,
677	const struct ev_token_bucket_cfg *cfg)
678{
679	int same_tick;
680	if (!g || !cfg)
681		return -1;
682
683	LOCK_GROUP(g);
684	same_tick = evutil_timercmp(
685		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
686	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
687
688	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
689		g->rate_limit.read_limit = cfg->read_maximum;
690	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
691		g->rate_limit.write_limit = cfg->write_maximum;
692
693	if (!same_tick) {
694		/* This can cause a hiccup in the schedule */
695		event_add(&g->master_refill_event, &cfg->tick_timeout);
696	}
697
698	/* The new limits might force us to adjust min_share differently. */
699	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
700
701	UNLOCK_GROUP(g);
702	return 0;
703}
704
705int
706bufferevent_rate_limit_group_set_min_share(
707	struct bufferevent_rate_limit_group *g,
708	size_t share)
709{
710	if (share > EV_SSIZE_MAX)
711		return -1;
712
713	g->configured_min_share = share;
714
715	/* Can't set share to less than the one-tick maximum.  IOW, at steady
716	 * state, at least one connection can go per tick. */
717	if (share > g->rate_limit_cfg.read_rate)
718		share = g->rate_limit_cfg.read_rate;
719	if (share > g->rate_limit_cfg.write_rate)
720		share = g->rate_limit_cfg.write_rate;
721
722	g->min_share = share;
723	return 0;
724}
725
726void
727bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
728{
729	LOCK_GROUP(g);
730	EVUTIL_ASSERT(0 == g->n_members);
731	event_del(&g->master_refill_event);
732	UNLOCK_GROUP(g);
733	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
734	mm_free(g);
735}
736
737int
738bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
739    struct bufferevent_rate_limit_group *g)
740{
741	int wsuspend, rsuspend;
742	struct bufferevent_private *bevp =
743	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
744	BEV_LOCK(bev);
745
746	if (!bevp->rate_limiting) {
747		struct bufferevent_rate_limit *rlim;
748		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
749		if (!rlim) {
750			BEV_UNLOCK(bev);
751			return -1;
752		}
753		event_assign(&rlim->refill_bucket_event, bev->ev_base,
754		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
755		bevp->rate_limiting = rlim;
756	}
757
758	if (bevp->rate_limiting->group == g) {
759		BEV_UNLOCK(bev);
760		return 0;
761	}
762	if (bevp->rate_limiting->group)
763		bufferevent_remove_from_rate_limit_group(bev);
764
765	LOCK_GROUP(g);
766	bevp->rate_limiting->group = g;
767	++g->n_members;
768	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
769
770	rsuspend = g->read_suspended;
771	wsuspend = g->write_suspended;
772
773	UNLOCK_GROUP(g);
774
775	if (rsuspend)
776		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
777	if (wsuspend)
778		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
779
780	BEV_UNLOCK(bev);
781	return 0;
782}
783
784int
785bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
786{
787	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
788}
789
790int
791bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
792    int unsuspend)
793{
794	struct bufferevent_private *bevp =
795	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
796	BEV_LOCK(bev);
797	if (bevp->rate_limiting && bevp->rate_limiting->group) {
798		struct bufferevent_rate_limit_group *g =
799		    bevp->rate_limiting->group;
800		LOCK_GROUP(g);
801		bevp->rate_limiting->group = NULL;
802		--g->n_members;
803		LIST_REMOVE(bevp, rate_limiting->next_in_group);
804		UNLOCK_GROUP(g);
805	}
806	if (unsuspend) {
807		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
808		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
809	}
810	BEV_UNLOCK(bev);
811	return 0;
812}
813
814/* ===
815 * API functions to expose rate limits.
816 *
817 * Don't use these from inside Libevent; they're meant to be for use by
818 * the program.
819 * === */
820
821/* Mostly you don't want to use this function from inside libevent;
822 * bufferevent_get_read_max_() is more likely what you want*/
823ev_ssize_t
824bufferevent_get_read_limit(struct bufferevent *bev)
825{
826	ev_ssize_t r;
827	struct bufferevent_private *bevp;
828	BEV_LOCK(bev);
829	bevp = BEV_UPCAST(bev);
830	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
831		bufferevent_update_buckets(bevp);
832		r = bevp->rate_limiting->limit.read_limit;
833	} else {
834		r = EV_SSIZE_MAX;
835	}
836	BEV_UNLOCK(bev);
837	return r;
838}
839
840/* Mostly you don't want to use this function from inside libevent;
841 * bufferevent_get_write_max_() is more likely what you want*/
842ev_ssize_t
843bufferevent_get_write_limit(struct bufferevent *bev)
844{
845	ev_ssize_t r;
846	struct bufferevent_private *bevp;
847	BEV_LOCK(bev);
848	bevp = BEV_UPCAST(bev);
849	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
850		bufferevent_update_buckets(bevp);
851		r = bevp->rate_limiting->limit.write_limit;
852	} else {
853		r = EV_SSIZE_MAX;
854	}
855	BEV_UNLOCK(bev);
856	return r;
857}
858
859int
860bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
861{
862	struct bufferevent_private *bevp;
863	BEV_LOCK(bev);
864	bevp = BEV_UPCAST(bev);
865	if (size == 0 || size > EV_SSIZE_MAX)
866		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
867	else
868		bevp->max_single_read = size;
869	BEV_UNLOCK(bev);
870	return 0;
871}
872
873int
874bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
875{
876	struct bufferevent_private *bevp;
877	BEV_LOCK(bev);
878	bevp = BEV_UPCAST(bev);
879	if (size == 0 || size > EV_SSIZE_MAX)
880		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
881	else
882		bevp->max_single_write = size;
883	BEV_UNLOCK(bev);
884	return 0;
885}
886
887ev_ssize_t
888bufferevent_get_max_single_read(struct bufferevent *bev)
889{
890	ev_ssize_t r;
891
892	BEV_LOCK(bev);
893	r = BEV_UPCAST(bev)->max_single_read;
894	BEV_UNLOCK(bev);
895	return r;
896}
897
898ev_ssize_t
899bufferevent_get_max_single_write(struct bufferevent *bev)
900{
901	ev_ssize_t r;
902
903	BEV_LOCK(bev);
904	r = BEV_UPCAST(bev)->max_single_write;
905	BEV_UNLOCK(bev);
906	return r;
907}
908
909ev_ssize_t
910bufferevent_get_max_to_read(struct bufferevent *bev)
911{
912	ev_ssize_t r;
913	BEV_LOCK(bev);
914	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
915	BEV_UNLOCK(bev);
916	return r;
917}
918
919ev_ssize_t
920bufferevent_get_max_to_write(struct bufferevent *bev)
921{
922	ev_ssize_t r;
923	BEV_LOCK(bev);
924	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
925	BEV_UNLOCK(bev);
926	return r;
927}
928
929const struct ev_token_bucket_cfg *
930bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
931	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
932	struct ev_token_bucket_cfg *cfg;
933
934	BEV_LOCK(bev);
935
936	if (bufev_private->rate_limiting) {
937		cfg = bufev_private->rate_limiting->cfg;
938	} else {
939		cfg = NULL;
940	}
941
942	BEV_UNLOCK(bev);
943
944	return cfg;
945}
946
947/* Mostly you don't want to use this function from inside libevent;
948 * bufferevent_get_read_max_() is more likely what you want*/
949ev_ssize_t
950bufferevent_rate_limit_group_get_read_limit(
951	struct bufferevent_rate_limit_group *grp)
952{
953	ev_ssize_t r;
954	LOCK_GROUP(grp);
955	r = grp->rate_limit.read_limit;
956	UNLOCK_GROUP(grp);
957	return r;
958}
959
960/* Mostly you don't want to use this function from inside libevent;
961 * bufferevent_get_write_max_() is more likely what you want. */
962ev_ssize_t
963bufferevent_rate_limit_group_get_write_limit(
964	struct bufferevent_rate_limit_group *grp)
965{
966	ev_ssize_t r;
967	LOCK_GROUP(grp);
968	r = grp->rate_limit.write_limit;
969	UNLOCK_GROUP(grp);
970	return r;
971}
972
973int
974bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
975{
976	int r = 0;
977	ev_ssize_t old_limit, new_limit;
978	struct bufferevent_private *bevp;
979	BEV_LOCK(bev);
980	bevp = BEV_UPCAST(bev);
981	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
982	old_limit = bevp->rate_limiting->limit.read_limit;
983
984	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
985	if (old_limit > 0 && new_limit <= 0) {
986		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
987		if (event_add(&bevp->rate_limiting->refill_bucket_event,
988			&bevp->rate_limiting->cfg->tick_timeout) < 0)
989			r = -1;
990	} else if (old_limit <= 0 && new_limit > 0) {
991		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
992			event_del(&bevp->rate_limiting->refill_bucket_event);
993		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
994	}
995
996	BEV_UNLOCK(bev);
997	return r;
998}
999
1000int
1001bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1002{
1003	/* XXXX this is mostly copy-and-paste from
1004	 * bufferevent_decrement_read_limit */
1005	int r = 0;
1006	ev_ssize_t old_limit, new_limit;
1007	struct bufferevent_private *bevp;
1008	BEV_LOCK(bev);
1009	bevp = BEV_UPCAST(bev);
1010	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1011	old_limit = bevp->rate_limiting->limit.write_limit;
1012
1013	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1014	if (old_limit > 0 && new_limit <= 0) {
1015		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1016		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1017			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1018			r = -1;
1019	} else if (old_limit <= 0 && new_limit > 0) {
1020		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1021			event_del(&bevp->rate_limiting->refill_bucket_event);
1022		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1023	}
1024
1025	BEV_UNLOCK(bev);
1026	return r;
1027}
1028
1029int
1030bufferevent_rate_limit_group_decrement_read(
1031	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1032{
1033	int r = 0;
1034	ev_ssize_t old_limit, new_limit;
1035	LOCK_GROUP(grp);
1036	old_limit = grp->rate_limit.read_limit;
1037	new_limit = (grp->rate_limit.read_limit -= decr);
1038
1039	if (old_limit > 0 && new_limit <= 0) {
1040		bev_group_suspend_reading_(grp);
1041	} else if (old_limit <= 0 && new_limit > 0) {
1042		bev_group_unsuspend_reading_(grp);
1043	}
1044
1045	UNLOCK_GROUP(grp);
1046	return r;
1047}
1048
1049int
1050bufferevent_rate_limit_group_decrement_write(
1051	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1052{
1053	int r = 0;
1054	ev_ssize_t old_limit, new_limit;
1055	LOCK_GROUP(grp);
1056	old_limit = grp->rate_limit.write_limit;
1057	new_limit = (grp->rate_limit.write_limit -= decr);
1058
1059	if (old_limit > 0 && new_limit <= 0) {
1060		bev_group_suspend_writing_(grp);
1061	} else if (old_limit <= 0 && new_limit > 0) {
1062		bev_group_unsuspend_writing_(grp);
1063	}
1064
1065	UNLOCK_GROUP(grp);
1066	return r;
1067}
1068
1069void
1070bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1071    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1072{
1073	EVUTIL_ASSERT(grp != NULL);
1074	if (total_read_out)
1075		*total_read_out = grp->total_read;
1076	if (total_written_out)
1077		*total_written_out = grp->total_written;
1078}
1079
1080void
1081bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1082{
1083	grp->total_read = grp->total_written = 0;
1084}
1085
1086int
1087bufferevent_ratelim_init_(struct bufferevent_private *bev)
1088{
1089	bev->rate_limiting = NULL;
1090	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1091	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1092
1093	return 0;
1094}
1095