bufferevent_ratelim.c revision 290001
1/*
2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 *    derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28#include "evconfig-private.h"
29
30#include <sys/types.h>
31#include <limits.h>
32#include <string.h>
33#include <stdlib.h>
34
35#include "event2/event.h"
36#include "event2/event_struct.h"
37#include "event2/util.h"
38#include "event2/bufferevent.h"
39#include "event2/bufferevent_struct.h"
40#include "event2/buffer.h"
41
42#include "ratelim-internal.h"
43
44#include "bufferevent-internal.h"
45#include "mm-internal.h"
46#include "util-internal.h"
47#include "event-internal.h"
48
49int
50ev_token_bucket_init_(struct ev_token_bucket *bucket,
51    const struct ev_token_bucket_cfg *cfg,
52    ev_uint32_t current_tick,
53    int reinitialize)
54{
55	if (reinitialize) {
56		/* on reinitialization, we only clip downwards, since we've
57		   already used who-knows-how-much bandwidth this tick.  We
58		   leave "last_updated" as it is; the next update will add the
59		   appropriate amount of bandwidth to the bucket.
60		*/
61		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62			bucket->read_limit = cfg->read_maximum;
63		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64			bucket->write_limit = cfg->write_maximum;
65	} else {
66		bucket->read_limit = cfg->read_rate;
67		bucket->write_limit = cfg->write_rate;
68		bucket->last_updated = current_tick;
69	}
70	return 0;
71}
72
73int
74ev_token_bucket_update_(struct ev_token_bucket *bucket,
75    const struct ev_token_bucket_cfg *cfg,
76    ev_uint32_t current_tick)
77{
78	/* It's okay if the tick number overflows, since we'll just
79	 * wrap around when we do the unsigned substraction. */
80	unsigned n_ticks = current_tick - bucket->last_updated;
81
82	/* Make sure some ticks actually happened, and that time didn't
83	 * roll back. */
84	if (n_ticks == 0 || n_ticks > INT_MAX)
85		return 0;
86
87	/* Naively, we would say
88		bucket->limit += n_ticks * cfg->rate;
89
90		if (bucket->limit > cfg->maximum)
91			bucket->limit = cfg->maximum;
92
93	   But we're worried about overflow, so we do it like this:
94	*/
95
96	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97		bucket->read_limit = cfg->read_maximum;
98	else
99		bucket->read_limit += n_ticks * cfg->read_rate;
100
101
102	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103		bucket->write_limit = cfg->write_maximum;
104	else
105		bucket->write_limit += n_ticks * cfg->write_rate;
106
107
108	bucket->last_updated = current_tick;
109
110	return 1;
111}
112
113static inline void
114bufferevent_update_buckets(struct bufferevent_private *bev)
115{
116	/* Must hold lock on bev. */
117	struct timeval now;
118	unsigned tick;
119	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121	if (tick != bev->rate_limiting->limit.last_updated)
122		ev_token_bucket_update_(&bev->rate_limiting->limit,
123		    bev->rate_limiting->cfg, tick);
124}
125
126ev_uint32_t
127ev_token_bucket_get_tick_(const struct timeval *tv,
128    const struct ev_token_bucket_cfg *cfg)
129{
130	/* This computation uses two multiplies and a divide.  We could do
131	 * fewer if we knew that the tick length was an integer number of
132	 * seconds, or if we knew it divided evenly into a second.  We should
133	 * investigate that more.
134	 */
135
136	/* We cast to an ev_uint64_t first, since we don't want to overflow
137	 * before we do the final divide. */
138	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139	return (unsigned)(msec / cfg->msec_per_tick);
140}
141
142struct ev_token_bucket_cfg *
143ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144    size_t write_rate, size_t write_burst,
145    const struct timeval *tick_len)
146{
147	struct ev_token_bucket_cfg *r;
148	struct timeval g;
149	if (! tick_len) {
150		g.tv_sec = 1;
151		g.tv_usec = 0;
152		tick_len = &g;
153	}
154	if (read_rate > read_burst || write_rate > write_burst ||
155	    read_rate < 1 || write_rate < 1)
156		return NULL;
157	if (read_rate > EV_RATE_LIMIT_MAX ||
158	    write_rate > EV_RATE_LIMIT_MAX ||
159	    read_burst > EV_RATE_LIMIT_MAX ||
160	    write_burst > EV_RATE_LIMIT_MAX)
161		return NULL;
162	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163	if (!r)
164		return NULL;
165	r->read_rate = read_rate;
166	r->write_rate = write_rate;
167	r->read_maximum = read_burst;
168	r->write_maximum = write_burst;
169	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170	r->msec_per_tick = (tick_len->tv_sec * 1000) +
171	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172	return r;
173}
174
175void
176ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177{
178	mm_free(cfg);
179}
180
181/* Default values for max_single_read & max_single_write variables. */
182#define MAX_SINGLE_READ_DEFAULT 16384
183#define MAX_SINGLE_WRITE_DEFAULT 16384
184
185#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187
188static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192
193/** Helper: figure out the maximum amount we should write if is_write, or
194    the maximum amount we should read if is_read.  Return that maximum, or
195    0 if our bucket is wholly exhausted.
196 */
197static inline ev_ssize_t
198bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199{
200	/* needs lock on bev. */
201	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202
203#define LIM(x)						\
204	(is_write ? (x).write_limit : (x).read_limit)
205
206#define GROUP_SUSPENDED(g)			\
207	(is_write ? (g)->write_suspended : (g)->read_suspended)
208
209	/* Sets max_so_far to MIN(x, max_so_far) */
210#define CLAMPTO(x)				\
211	do {					\
212		if (max_so_far > (x))		\
213			max_so_far = (x);	\
214	} while (0);
215
216	if (!bev->rate_limiting)
217		return max_so_far;
218
219	/* If rate-limiting is enabled at all, update the appropriate
220	   bucket, and take the smaller of our rate limit and the group
221	   rate limit.
222	 */
223
224	if (bev->rate_limiting->cfg) {
225		bufferevent_update_buckets(bev);
226		max_so_far = LIM(bev->rate_limiting->limit);
227	}
228	if (bev->rate_limiting->group) {
229		struct bufferevent_rate_limit_group *g =
230		    bev->rate_limiting->group;
231		ev_ssize_t share;
232		LOCK_GROUP(g);
233		if (GROUP_SUSPENDED(g)) {
234			/* We can get here if we failed to lock this
235			 * particular bufferevent while suspending the whole
236			 * group. */
237			if (is_write)
238				bufferevent_suspend_write_(&bev->bev,
239				    BEV_SUSPEND_BW_GROUP);
240			else
241				bufferevent_suspend_read_(&bev->bev,
242				    BEV_SUSPEND_BW_GROUP);
243			share = 0;
244		} else {
245			/* XXXX probably we should divide among the active
246			 * members, not the total members. */
247			share = LIM(g->rate_limit) / g->n_members;
248			if (share < g->min_share)
249				share = g->min_share;
250		}
251		UNLOCK_GROUP(g);
252		CLAMPTO(share);
253	}
254
255	if (max_so_far < 0)
256		max_so_far = 0;
257	return max_so_far;
258}
259
260ev_ssize_t
261bufferevent_get_read_max_(struct bufferevent_private *bev)
262{
263	return bufferevent_get_rlim_max_(bev, 0);
264}
265
266ev_ssize_t
267bufferevent_get_write_max_(struct bufferevent_private *bev)
268{
269	return bufferevent_get_rlim_max_(bev, 1);
270}
271
272int
273bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274{
275	/* XXXXX Make sure all users of this function check its return value */
276	int r = 0;
277	/* need to hold lock on bev */
278	if (!bev->rate_limiting)
279		return 0;
280
281	if (bev->rate_limiting->cfg) {
282		bev->rate_limiting->limit.read_limit -= bytes;
283		if (bev->rate_limiting->limit.read_limit <= 0) {
284			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285			if (event_add(&bev->rate_limiting->refill_bucket_event,
286				&bev->rate_limiting->cfg->tick_timeout) < 0)
287				r = -1;
288		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
289			if (!(bev->write_suspended & BEV_SUSPEND_BW))
290				event_del(&bev->rate_limiting->refill_bucket_event);
291			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292		}
293	}
294
295	if (bev->rate_limiting->group) {
296		LOCK_GROUP(bev->rate_limiting->group);
297		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298		bev->rate_limiting->group->total_read += bytes;
299		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300			bev_group_suspend_reading_(bev->rate_limiting->group);
301		} else if (bev->rate_limiting->group->read_suspended) {
302			bev_group_unsuspend_reading_(bev->rate_limiting->group);
303		}
304		UNLOCK_GROUP(bev->rate_limiting->group);
305	}
306
307	return r;
308}
309
310int
311bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312{
313	/* XXXXX Make sure all users of this function check its return value */
314	int r = 0;
315	/* need to hold lock */
316	if (!bev->rate_limiting)
317		return 0;
318
319	if (bev->rate_limiting->cfg) {
320		bev->rate_limiting->limit.write_limit -= bytes;
321		if (bev->rate_limiting->limit.write_limit <= 0) {
322			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323			if (event_add(&bev->rate_limiting->refill_bucket_event,
324				&bev->rate_limiting->cfg->tick_timeout) < 0)
325				r = -1;
326		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
327			if (!(bev->read_suspended & BEV_SUSPEND_BW))
328				event_del(&bev->rate_limiting->refill_bucket_event);
329			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330		}
331	}
332
333	if (bev->rate_limiting->group) {
334		LOCK_GROUP(bev->rate_limiting->group);
335		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336		bev->rate_limiting->group->total_written += bytes;
337		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338			bev_group_suspend_writing_(bev->rate_limiting->group);
339		} else if (bev->rate_limiting->group->write_suspended) {
340			bev_group_unsuspend_writing_(bev->rate_limiting->group);
341		}
342		UNLOCK_GROUP(bev->rate_limiting->group);
343	}
344
345	return r;
346}
347
348/** Stop reading on every bufferevent in <b>g</b> */
349static int
350bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351{
352	/* Needs group lock */
353	struct bufferevent_private *bev;
354	g->read_suspended = 1;
355	g->pending_unsuspend_read = 0;
356
357	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
359	   the bufferevent locks.  If we are unable to lock any individual
360	   bufferevent, it will find out later when it looks at its limit
361	   and sees that its group is suspended.)
362	*/
363	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364		if (EVLOCK_TRY_LOCK_(bev->lock)) {
365			bufferevent_suspend_read_(&bev->bev,
366			    BEV_SUSPEND_BW_GROUP);
367			EVLOCK_UNLOCK(bev->lock, 0);
368		}
369	}
370	return 0;
371}
372
373/** Stop writing on every bufferevent in <b>g</b> */
374static int
375bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376{
377	/* Needs group lock */
378	struct bufferevent_private *bev;
379	g->write_suspended = 1;
380	g->pending_unsuspend_write = 0;
381	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382		if (EVLOCK_TRY_LOCK_(bev->lock)) {
383			bufferevent_suspend_write_(&bev->bev,
384			    BEV_SUSPEND_BW_GROUP);
385			EVLOCK_UNLOCK(bev->lock, 0);
386		}
387	}
388	return 0;
389}
390
391/** Timer callback invoked on a single bufferevent with one or more exhausted
392    buckets when they are ready to refill. */
393static void
394bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395{
396	unsigned tick;
397	struct timeval now;
398	struct bufferevent_private *bev = arg;
399	int again = 0;
400	BEV_LOCK(&bev->bev);
401	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402		BEV_UNLOCK(&bev->bev);
403		return;
404	}
405
406	/* First, update the bucket */
407	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408	tick = ev_token_bucket_get_tick_(&now,
409	    bev->rate_limiting->cfg);
410	ev_token_bucket_update_(&bev->rate_limiting->limit,
411	    bev->rate_limiting->cfg,
412	    tick);
413
414	/* Now unsuspend any read/write operations as appropriate. */
415	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416		if (bev->rate_limiting->limit.read_limit > 0)
417			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418		else
419			again = 1;
420	}
421	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422		if (bev->rate_limiting->limit.write_limit > 0)
423			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424		else
425			again = 1;
426	}
427	if (again) {
428		/* One or more of the buckets may need another refill if they
429		   started negative.
430
431		   XXXX if we need to be quiet for more ticks, we should
432		   maybe figure out what timeout we really want.
433		*/
434		/* XXXX Handle event_add failure somehow */
435		event_add(&bev->rate_limiting->refill_bucket_event,
436		    &bev->rate_limiting->cfg->tick_timeout);
437	}
438	BEV_UNLOCK(&bev->bev);
439}
440
441/** Helper: grab a random element from a bufferevent group.
442 *
443 * Requires that we hold the lock on the group.
444 */
445static struct bufferevent_private *
446bev_group_random_element_(struct bufferevent_rate_limit_group *group)
447{
448	int which;
449	struct bufferevent_private *bev;
450
451	/* requires group lock */
452
453	if (!group->n_members)
454		return NULL;
455
456	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457
458	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459
460	bev = LIST_FIRST(&group->members);
461	while (which--)
462		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463
464	return bev;
465}
466
467/** Iterate over the elements of a rate-limiting group 'g' with a random
468    starting point, assigning each to the variable 'bev', and executing the
469    block 'block'.
470
471    We do this in a half-baked effort to get fairness among group members.
472    XXX Round-robin or some kind of priority queue would be even more fair.
473 */
474#define FOREACH_RANDOM_ORDER(block)			\
475	do {						\
476		first = bev_group_random_element_(g);	\
477		for (bev = first; bev != LIST_END(&g->members); \
478		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479			block ;					 \
480		}						 \
481		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483			block ;						\
484		}							\
485	} while (0)
486
487static void
488bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489{
490	int again = 0;
491	struct bufferevent_private *bev, *first;
492
493	g->read_suspended = 0;
494	FOREACH_RANDOM_ORDER({
495		if (EVLOCK_TRY_LOCK_(bev->lock)) {
496			bufferevent_unsuspend_read_(&bev->bev,
497			    BEV_SUSPEND_BW_GROUP);
498			EVLOCK_UNLOCK(bev->lock, 0);
499		} else {
500			again = 1;
501		}
502	});
503	g->pending_unsuspend_read = again;
504}
505
506static void
507bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508{
509	int again = 0;
510	struct bufferevent_private *bev, *first;
511	g->write_suspended = 0;
512
513	FOREACH_RANDOM_ORDER({
514		if (EVLOCK_TRY_LOCK_(bev->lock)) {
515			bufferevent_unsuspend_write_(&bev->bev,
516			    BEV_SUSPEND_BW_GROUP);
517			EVLOCK_UNLOCK(bev->lock, 0);
518		} else {
519			again = 1;
520		}
521	});
522	g->pending_unsuspend_write = again;
523}
524
525/** Callback invoked every tick to add more elements to the group bucket
526    and unsuspend group members as needed.
527 */
528static void
529bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530{
531	struct bufferevent_rate_limit_group *g = arg;
532	unsigned tick;
533	struct timeval now;
534
535	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536
537	LOCK_GROUP(g);
538
539	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541
542	if (g->pending_unsuspend_read ||
543	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544		bev_group_unsuspend_reading_(g);
545	}
546	if (g->pending_unsuspend_write ||
547	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548		bev_group_unsuspend_writing_(g);
549	}
550
551	/* XXXX Rather than waiting to the next tick to unsuspend stuff
552	 * with pending_unsuspend_write/read, we should do it on the
553	 * next iteration of the mainloop.
554	 */
555
556	UNLOCK_GROUP(g);
557}
558
559int
560bufferevent_set_rate_limit(struct bufferevent *bev,
561    struct ev_token_bucket_cfg *cfg)
562{
563	struct bufferevent_private *bevp =
564	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
565	int r = -1;
566	struct bufferevent_rate_limit *rlim;
567	struct timeval now;
568	ev_uint32_t tick;
569	int reinit = 0, suspended = 0;
570	/* XXX reference-count cfg */
571
572	BEV_LOCK(bev);
573
574	if (cfg == NULL) {
575		if (bevp->rate_limiting) {
576			rlim = bevp->rate_limiting;
577			rlim->cfg = NULL;
578			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
579			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
580			if (event_initialized(&rlim->refill_bucket_event))
581				event_del(&rlim->refill_bucket_event);
582		}
583		r = 0;
584		goto done;
585	}
586
587	event_base_gettimeofday_cached(bev->ev_base, &now);
588	tick = ev_token_bucket_get_tick_(&now, cfg);
589
590	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
591		/* no-op */
592		r = 0;
593		goto done;
594	}
595	if (bevp->rate_limiting == NULL) {
596		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
597		if (!rlim)
598			goto done;
599		bevp->rate_limiting = rlim;
600	} else {
601		rlim = bevp->rate_limiting;
602	}
603	reinit = rlim->cfg != NULL;
604
605	rlim->cfg = cfg;
606	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
607
608	if (reinit) {
609		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
610		event_del(&rlim->refill_bucket_event);
611	}
612	event_assign(&rlim->refill_bucket_event, bev->ev_base,
613	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
614
615	if (rlim->limit.read_limit > 0) {
616		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
617	} else {
618		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
619		suspended=1;
620	}
621	if (rlim->limit.write_limit > 0) {
622		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
623	} else {
624		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
625		suspended = 1;
626	}
627
628	if (suspended)
629		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
630
631	r = 0;
632
633done:
634	BEV_UNLOCK(bev);
635	return r;
636}
637
638struct bufferevent_rate_limit_group *
639bufferevent_rate_limit_group_new(struct event_base *base,
640    const struct ev_token_bucket_cfg *cfg)
641{
642	struct bufferevent_rate_limit_group *g;
643	struct timeval now;
644	ev_uint32_t tick;
645
646	event_base_gettimeofday_cached(base, &now);
647	tick = ev_token_bucket_get_tick_(&now, cfg);
648
649	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
650	if (!g)
651		return NULL;
652	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
653	LIST_INIT(&g->members);
654
655	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
656
657	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
658	    bev_group_refill_callback_, g);
659	/*XXXX handle event_add failure */
660	event_add(&g->master_refill_event, &cfg->tick_timeout);
661
662	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
663
664	bufferevent_rate_limit_group_set_min_share(g, 64);
665
666	evutil_weakrand_seed_(&g->weakrand_seed,
667	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
668
669	return g;
670}
671
672int
673bufferevent_rate_limit_group_set_cfg(
674	struct bufferevent_rate_limit_group *g,
675	const struct ev_token_bucket_cfg *cfg)
676{
677	int same_tick;
678	if (!g || !cfg)
679		return -1;
680
681	LOCK_GROUP(g);
682	same_tick = evutil_timercmp(
683		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
684	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
685
686	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
687		g->rate_limit.read_limit = cfg->read_maximum;
688	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
689		g->rate_limit.write_limit = cfg->write_maximum;
690
691	if (!same_tick) {
692		/* This can cause a hiccup in the schedule */
693		event_add(&g->master_refill_event, &cfg->tick_timeout);
694	}
695
696	/* The new limits might force us to adjust min_share differently. */
697	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
698
699	UNLOCK_GROUP(g);
700	return 0;
701}
702
703int
704bufferevent_rate_limit_group_set_min_share(
705	struct bufferevent_rate_limit_group *g,
706	size_t share)
707{
708	if (share > EV_SSIZE_MAX)
709		return -1;
710
711	g->configured_min_share = share;
712
713	/* Can't set share to less than the one-tick maximum.  IOW, at steady
714	 * state, at least one connection can go per tick. */
715	if (share > g->rate_limit_cfg.read_rate)
716		share = g->rate_limit_cfg.read_rate;
717	if (share > g->rate_limit_cfg.write_rate)
718		share = g->rate_limit_cfg.write_rate;
719
720	g->min_share = share;
721	return 0;
722}
723
724void
725bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
726{
727	LOCK_GROUP(g);
728	EVUTIL_ASSERT(0 == g->n_members);
729	event_del(&g->master_refill_event);
730	UNLOCK_GROUP(g);
731	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
732	mm_free(g);
733}
734
735int
736bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
737    struct bufferevent_rate_limit_group *g)
738{
739	int wsuspend, rsuspend;
740	struct bufferevent_private *bevp =
741	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
742	BEV_LOCK(bev);
743
744	if (!bevp->rate_limiting) {
745		struct bufferevent_rate_limit *rlim;
746		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
747		if (!rlim) {
748			BEV_UNLOCK(bev);
749			return -1;
750		}
751		event_assign(&rlim->refill_bucket_event, bev->ev_base,
752		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
753		bevp->rate_limiting = rlim;
754	}
755
756	if (bevp->rate_limiting->group == g) {
757		BEV_UNLOCK(bev);
758		return 0;
759	}
760	if (bevp->rate_limiting->group)
761		bufferevent_remove_from_rate_limit_group(bev);
762
763	LOCK_GROUP(g);
764	bevp->rate_limiting->group = g;
765	++g->n_members;
766	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
767
768	rsuspend = g->read_suspended;
769	wsuspend = g->write_suspended;
770
771	UNLOCK_GROUP(g);
772
773	if (rsuspend)
774		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
775	if (wsuspend)
776		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
777
778	BEV_UNLOCK(bev);
779	return 0;
780}
781
782int
783bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
784{
785	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
786}
787
788int
789bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790    int unsuspend)
791{
792	struct bufferevent_private *bevp =
793	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
794	BEV_LOCK(bev);
795	if (bevp->rate_limiting && bevp->rate_limiting->group) {
796		struct bufferevent_rate_limit_group *g =
797		    bevp->rate_limiting->group;
798		LOCK_GROUP(g);
799		bevp->rate_limiting->group = NULL;
800		--g->n_members;
801		LIST_REMOVE(bevp, rate_limiting->next_in_group);
802		UNLOCK_GROUP(g);
803	}
804	if (unsuspend) {
805		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
806		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
807	}
808	BEV_UNLOCK(bev);
809	return 0;
810}
811
812/* ===
813 * API functions to expose rate limits.
814 *
815 * Don't use these from inside Libevent; they're meant to be for use by
816 * the program.
817 * === */
818
819/* Mostly you don't want to use this function from inside libevent;
820 * bufferevent_get_read_max_() is more likely what you want*/
821ev_ssize_t
822bufferevent_get_read_limit(struct bufferevent *bev)
823{
824	ev_ssize_t r;
825	struct bufferevent_private *bevp;
826	BEV_LOCK(bev);
827	bevp = BEV_UPCAST(bev);
828	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
829		bufferevent_update_buckets(bevp);
830		r = bevp->rate_limiting->limit.read_limit;
831	} else {
832		r = EV_SSIZE_MAX;
833	}
834	BEV_UNLOCK(bev);
835	return r;
836}
837
838/* Mostly you don't want to use this function from inside libevent;
839 * bufferevent_get_write_max_() is more likely what you want*/
840ev_ssize_t
841bufferevent_get_write_limit(struct bufferevent *bev)
842{
843	ev_ssize_t r;
844	struct bufferevent_private *bevp;
845	BEV_LOCK(bev);
846	bevp = BEV_UPCAST(bev);
847	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
848		bufferevent_update_buckets(bevp);
849		r = bevp->rate_limiting->limit.write_limit;
850	} else {
851		r = EV_SSIZE_MAX;
852	}
853	BEV_UNLOCK(bev);
854	return r;
855}
856
857int
858bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
859{
860	struct bufferevent_private *bevp;
861	BEV_LOCK(bev);
862	bevp = BEV_UPCAST(bev);
863	if (size == 0 || size > EV_SSIZE_MAX)
864		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
865	else
866		bevp->max_single_read = size;
867	BEV_UNLOCK(bev);
868	return 0;
869}
870
871int
872bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
873{
874	struct bufferevent_private *bevp;
875	BEV_LOCK(bev);
876	bevp = BEV_UPCAST(bev);
877	if (size == 0 || size > EV_SSIZE_MAX)
878		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
879	else
880		bevp->max_single_write = size;
881	BEV_UNLOCK(bev);
882	return 0;
883}
884
885ev_ssize_t
886bufferevent_get_max_single_read(struct bufferevent *bev)
887{
888	ev_ssize_t r;
889
890	BEV_LOCK(bev);
891	r = BEV_UPCAST(bev)->max_single_read;
892	BEV_UNLOCK(bev);
893	return r;
894}
895
896ev_ssize_t
897bufferevent_get_max_single_write(struct bufferevent *bev)
898{
899	ev_ssize_t r;
900
901	BEV_LOCK(bev);
902	r = BEV_UPCAST(bev)->max_single_write;
903	BEV_UNLOCK(bev);
904	return r;
905}
906
907ev_ssize_t
908bufferevent_get_max_to_read(struct bufferevent *bev)
909{
910	ev_ssize_t r;
911	BEV_LOCK(bev);
912	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
913	BEV_UNLOCK(bev);
914	return r;
915}
916
917ev_ssize_t
918bufferevent_get_max_to_write(struct bufferevent *bev)
919{
920	ev_ssize_t r;
921	BEV_LOCK(bev);
922	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
923	BEV_UNLOCK(bev);
924	return r;
925}
926
927const struct ev_token_bucket_cfg *
928bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
929	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
930	struct ev_token_bucket_cfg *cfg;
931
932	BEV_LOCK(bev);
933
934	if (bufev_private->rate_limiting) {
935		cfg = bufev_private->rate_limiting->cfg;
936	} else {
937		cfg = NULL;
938	}
939
940	BEV_UNLOCK(bev);
941
942	return cfg;
943}
944
945/* Mostly you don't want to use this function from inside libevent;
946 * bufferevent_get_read_max_() is more likely what you want*/
947ev_ssize_t
948bufferevent_rate_limit_group_get_read_limit(
949	struct bufferevent_rate_limit_group *grp)
950{
951	ev_ssize_t r;
952	LOCK_GROUP(grp);
953	r = grp->rate_limit.read_limit;
954	UNLOCK_GROUP(grp);
955	return r;
956}
957
958/* Mostly you don't want to use this function from inside libevent;
959 * bufferevent_get_write_max_() is more likely what you want. */
960ev_ssize_t
961bufferevent_rate_limit_group_get_write_limit(
962	struct bufferevent_rate_limit_group *grp)
963{
964	ev_ssize_t r;
965	LOCK_GROUP(grp);
966	r = grp->rate_limit.write_limit;
967	UNLOCK_GROUP(grp);
968	return r;
969}
970
971int
972bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
973{
974	int r = 0;
975	ev_ssize_t old_limit, new_limit;
976	struct bufferevent_private *bevp;
977	BEV_LOCK(bev);
978	bevp = BEV_UPCAST(bev);
979	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
980	old_limit = bevp->rate_limiting->limit.read_limit;
981
982	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
983	if (old_limit > 0 && new_limit <= 0) {
984		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
985		if (event_add(&bevp->rate_limiting->refill_bucket_event,
986			&bevp->rate_limiting->cfg->tick_timeout) < 0)
987			r = -1;
988	} else if (old_limit <= 0 && new_limit > 0) {
989		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
990			event_del(&bevp->rate_limiting->refill_bucket_event);
991		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
992	}
993
994	BEV_UNLOCK(bev);
995	return r;
996}
997
998int
999bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1000{
1001	/* XXXX this is mostly copy-and-paste from
1002	 * bufferevent_decrement_read_limit */
1003	int r = 0;
1004	ev_ssize_t old_limit, new_limit;
1005	struct bufferevent_private *bevp;
1006	BEV_LOCK(bev);
1007	bevp = BEV_UPCAST(bev);
1008	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1009	old_limit = bevp->rate_limiting->limit.write_limit;
1010
1011	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1012	if (old_limit > 0 && new_limit <= 0) {
1013		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1014		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1015			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1016			r = -1;
1017	} else if (old_limit <= 0 && new_limit > 0) {
1018		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1019			event_del(&bevp->rate_limiting->refill_bucket_event);
1020		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1021	}
1022
1023	BEV_UNLOCK(bev);
1024	return r;
1025}
1026
1027int
1028bufferevent_rate_limit_group_decrement_read(
1029	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1030{
1031	int r = 0;
1032	ev_ssize_t old_limit, new_limit;
1033	LOCK_GROUP(grp);
1034	old_limit = grp->rate_limit.read_limit;
1035	new_limit = (grp->rate_limit.read_limit -= decr);
1036
1037	if (old_limit > 0 && new_limit <= 0) {
1038		bev_group_suspend_reading_(grp);
1039	} else if (old_limit <= 0 && new_limit > 0) {
1040		bev_group_unsuspend_reading_(grp);
1041	}
1042
1043	UNLOCK_GROUP(grp);
1044	return r;
1045}
1046
1047int
1048bufferevent_rate_limit_group_decrement_write(
1049	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1050{
1051	int r = 0;
1052	ev_ssize_t old_limit, new_limit;
1053	LOCK_GROUP(grp);
1054	old_limit = grp->rate_limit.write_limit;
1055	new_limit = (grp->rate_limit.write_limit -= decr);
1056
1057	if (old_limit > 0 && new_limit <= 0) {
1058		bev_group_suspend_writing_(grp);
1059	} else if (old_limit <= 0 && new_limit > 0) {
1060		bev_group_unsuspend_writing_(grp);
1061	}
1062
1063	UNLOCK_GROUP(grp);
1064	return r;
1065}
1066
1067void
1068bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1069    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1070{
1071	EVUTIL_ASSERT(grp != NULL);
1072	if (total_read_out)
1073		*total_read_out = grp->total_read;
1074	if (total_written_out)
1075		*total_written_out = grp->total_written;
1076}
1077
1078void
1079bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1080{
1081	grp->total_read = grp->total_written = 0;
1082}
1083
1084int
1085bufferevent_ratelim_init_(struct bufferevent_private *bev)
1086{
1087	bev->rate_limiting = NULL;
1088	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1089	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1090
1091	return 0;
1092}
1093