bufferevent_ratelim.c revision 1.1
1/*	$NetBSD: bufferevent_ratelim.c,v 1.1 2013/04/11 16:43:24 christos Exp $	*/
2/*
3 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 *    notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 *    notice, this list of conditions and the following disclaimer in the
14 *    documentation and/or other materials provided with the distribution.
15 * 3. The name of the author may not be used to endorse or promote products
16 *    derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include <sys/types.h>
31#include <limits.h>
32#include <string.h>
33#include <stdlib.h>
34
35#include "event2/event.h"
36#include "event2/event_struct.h"
37#include "event2/util.h"
38#include "event2/bufferevent.h"
39#include "event2/bufferevent_struct.h"
40#include "event2/buffer.h"
41
42#include "ratelim-internal.h"
43
44#include "bufferevent-internal.h"
45#include "mm-internal.h"
46#include "util-internal.h"
47#include "event-internal.h"
48
49int
50ev_token_bucket_init(struct ev_token_bucket *bucket,
51    const struct ev_token_bucket_cfg *cfg,
52    ev_uint32_t current_tick,
53    int reinitialize)
54{
55	if (reinitialize) {
56		/* on reinitialization, we only clip downwards, since we've
57		   already used who-knows-how-much bandwidth this tick.  We
58		   leave "last_updated" as it is; the next update will add the
59		   appropriate amount of bandwidth to the bucket.
60		*/
61		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62			bucket->read_limit = cfg->read_maximum;
63		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64			bucket->write_limit = cfg->write_maximum;
65	} else {
66		bucket->read_limit = cfg->read_rate;
67		bucket->write_limit = cfg->write_rate;
68		bucket->last_updated = current_tick;
69	}
70	return 0;
71}
72
73int
74ev_token_bucket_update(struct ev_token_bucket *bucket,
75    const struct ev_token_bucket_cfg *cfg,
76    ev_uint32_t current_tick)
77{
78	/* It's okay if the tick number overflows, since we'll just
79	 * wrap around when we do the unsigned substraction. */
80	unsigned n_ticks = current_tick - bucket->last_updated;
81
82	/* Make sure some ticks actually happened, and that time didn't
83	 * roll back. */
84	if (n_ticks == 0 || n_ticks > INT_MAX)
85		return 0;
86
87	/* Naively, we would say
88		bucket->limit += n_ticks * cfg->rate;
89
90		if (bucket->limit > cfg->maximum)
91			bucket->limit = cfg->maximum;
92
93	   But we're worried about overflow, so we do it like this:
94	*/
95
96	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97		bucket->read_limit = cfg->read_maximum;
98	else
99		bucket->read_limit += n_ticks * cfg->read_rate;
100
101
102	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103		bucket->write_limit = cfg->write_maximum;
104	else
105		bucket->write_limit += n_ticks * cfg->write_rate;
106
107
108	bucket->last_updated = current_tick;
109
110	return 1;
111}
112
113static inline void
114bufferevent_update_buckets(struct bufferevent_private *bev)
115{
116	/* Must hold lock on bev. */
117	struct timeval now;
118	unsigned tick;
119	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120	tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
121	if (tick != bev->rate_limiting->limit.last_updated)
122		ev_token_bucket_update(&bev->rate_limiting->limit,
123		    bev->rate_limiting->cfg, tick);
124}
125
126ev_uint32_t
127ev_token_bucket_get_tick(const struct timeval *tv,
128    const struct ev_token_bucket_cfg *cfg)
129{
130	/* This computation uses two multiplies and a divide.  We could do
131	 * fewer if we knew that the tick length was an integer number of
132	 * seconds, or if we knew it divided evenly into a second.  We should
133	 * investigate that more.
134	 */
135
136	/* We cast to an ev_uint64_t first, since we don't want to overflow
137	 * before we do the final divide. */
138	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139	return (unsigned)(msec / cfg->msec_per_tick);
140}
141
142struct ev_token_bucket_cfg *
143ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144    size_t write_rate, size_t write_burst,
145    const struct timeval *tick_len)
146{
147	struct ev_token_bucket_cfg *r;
148	struct timeval g;
149	if (! tick_len) {
150		g.tv_sec = 1;
151		g.tv_usec = 0;
152		tick_len = &g;
153	}
154	if (read_rate > read_burst || write_rate > write_burst ||
155	    read_rate < 1 || write_rate < 1)
156		return NULL;
157	if (read_rate > EV_RATE_LIMIT_MAX ||
158	    write_rate > EV_RATE_LIMIT_MAX ||
159	    read_burst > EV_RATE_LIMIT_MAX ||
160	    write_burst > EV_RATE_LIMIT_MAX)
161		return NULL;
162	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163	if (!r)
164		return NULL;
165	r->read_rate = read_rate;
166	r->write_rate = write_rate;
167	r->read_maximum = read_burst;
168	r->write_maximum = write_burst;
169	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170	r->msec_per_tick = (tick_len->tv_sec * 1000) +
171	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172	return r;
173}
174
175void
176ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177{
178	mm_free(cfg);
179}
180
181/* No matter how big our bucket gets, don't try to read more than this
182 * much in a single read operation. */
183#define MAX_TO_READ_EVER 16384
184/* No matter how big our bucket gets, don't try to write more than this
185 * much in a single write operation. */
186#define MAX_TO_WRITE_EVER 16384
187
188#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
189#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
190
191static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
192static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
193static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
194static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
195
196/** Helper: figure out the maximum amount we should write if is_write, or
197    the maximum amount we should read if is_read.  Return that maximum, or
198    0 if our bucket is wholly exhausted.
199 */
200static inline ev_ssize_t
201_bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
202{
203	/* needs lock on bev. */
204	ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
205
206#define LIM(x)						\
207	(is_write ? (x).write_limit : (x).read_limit)
208
209#define GROUP_SUSPENDED(g)			\
210	(is_write ? (g)->write_suspended : (g)->read_suspended)
211
212	/* Sets max_so_far to MIN(x, max_so_far) */
213#define CLAMPTO(x)				\
214	do {					\
215		if (max_so_far > (x))		\
216			max_so_far = (x);	\
217	} while (0);
218
219	if (!bev->rate_limiting)
220		return max_so_far;
221
222	/* If rate-limiting is enabled at all, update the appropriate
223	   bucket, and take the smaller of our rate limit and the group
224	   rate limit.
225	 */
226
227	if (bev->rate_limiting->cfg) {
228		bufferevent_update_buckets(bev);
229		max_so_far = LIM(bev->rate_limiting->limit);
230	}
231	if (bev->rate_limiting->group) {
232		struct bufferevent_rate_limit_group *g =
233		    bev->rate_limiting->group;
234		ev_ssize_t share;
235		LOCK_GROUP(g);
236		if (GROUP_SUSPENDED(g)) {
237			/* We can get here if we failed to lock this
238			 * particular bufferevent while suspending the whole
239			 * group. */
240			if (is_write)
241				bufferevent_suspend_write(&bev->bev,
242				    BEV_SUSPEND_BW_GROUP);
243			else
244				bufferevent_suspend_read(&bev->bev,
245				    BEV_SUSPEND_BW_GROUP);
246			share = 0;
247		} else {
248			/* XXXX probably we should divide among the active
249			 * members, not the total members. */
250			share = LIM(g->rate_limit) / g->n_members;
251			if (share < g->min_share)
252				share = g->min_share;
253		}
254		UNLOCK_GROUP(g);
255		CLAMPTO(share);
256	}
257
258	if (max_so_far < 0)
259		max_so_far = 0;
260	return max_so_far;
261}
262
263ev_ssize_t
264_bufferevent_get_read_max(struct bufferevent_private *bev)
265{
266	return _bufferevent_get_rlim_max(bev, 0);
267}
268
269ev_ssize_t
270_bufferevent_get_write_max(struct bufferevent_private *bev)
271{
272	return _bufferevent_get_rlim_max(bev, 1);
273}
274
275int
276_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
277{
278	/* XXXXX Make sure all users of this function check its return value */
279	int r = 0;
280	/* need to hold lock on bev */
281	if (!bev->rate_limiting)
282		return 0;
283
284	if (bev->rate_limiting->cfg) {
285		bev->rate_limiting->limit.read_limit -= bytes;
286		if (bev->rate_limiting->limit.read_limit <= 0) {
287			bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
288			if (event_add(&bev->rate_limiting->refill_bucket_event,
289				&bev->rate_limiting->cfg->tick_timeout) < 0)
290				r = -1;
291		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
292			if (!(bev->write_suspended & BEV_SUSPEND_BW))
293				event_del(&bev->rate_limiting->refill_bucket_event);
294			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
295		}
296	}
297
298	if (bev->rate_limiting->group) {
299		LOCK_GROUP(bev->rate_limiting->group);
300		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
301		bev->rate_limiting->group->total_read += bytes;
302		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
303			_bev_group_suspend_reading(bev->rate_limiting->group);
304		} else if (bev->rate_limiting->group->read_suspended) {
305			_bev_group_unsuspend_reading(bev->rate_limiting->group);
306		}
307		UNLOCK_GROUP(bev->rate_limiting->group);
308	}
309
310	return r;
311}
312
313int
314_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
315{
316	/* XXXXX Make sure all users of this function check its return value */
317	int r = 0;
318	/* need to hold lock */
319	if (!bev->rate_limiting)
320		return 0;
321
322	if (bev->rate_limiting->cfg) {
323		bev->rate_limiting->limit.write_limit -= bytes;
324		if (bev->rate_limiting->limit.write_limit <= 0) {
325			bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
326			if (event_add(&bev->rate_limiting->refill_bucket_event,
327				&bev->rate_limiting->cfg->tick_timeout) < 0)
328				r = -1;
329		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
330			if (!(bev->read_suspended & BEV_SUSPEND_BW))
331				event_del(&bev->rate_limiting->refill_bucket_event);
332			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
333		}
334	}
335
336	if (bev->rate_limiting->group) {
337		LOCK_GROUP(bev->rate_limiting->group);
338		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
339		bev->rate_limiting->group->total_written += bytes;
340		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
341			_bev_group_suspend_writing(bev->rate_limiting->group);
342		} else if (bev->rate_limiting->group->write_suspended) {
343			_bev_group_unsuspend_writing(bev->rate_limiting->group);
344		}
345		UNLOCK_GROUP(bev->rate_limiting->group);
346	}
347
348	return r;
349}
350
351/** Stop reading on every bufferevent in <b>g</b> */
352static int
353_bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
354{
355	/* Needs group lock */
356	struct bufferevent_private *bev;
357	g->read_suspended = 1;
358	g->pending_unsuspend_read = 0;
359
360	/* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
361	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
362	   the bufferevent locks.  If we are unable to lock any individual
363	   bufferevent, it will find out later when it looks at its limit
364	   and sees that its group is suspended.
365	*/
366	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
367		if (EVLOCK_TRY_LOCK(bev->lock)) {
368			bufferevent_suspend_read(&bev->bev,
369			    BEV_SUSPEND_BW_GROUP);
370			EVLOCK_UNLOCK(bev->lock, 0);
371		}
372	}
373	return 0;
374}
375
376/** Stop writing on every bufferevent in <b>g</b> */
377static int
378_bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
379{
380	/* Needs group lock */
381	struct bufferevent_private *bev;
382	g->write_suspended = 1;
383	g->pending_unsuspend_write = 0;
384	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
385		if (EVLOCK_TRY_LOCK(bev->lock)) {
386			bufferevent_suspend_write(&bev->bev,
387			    BEV_SUSPEND_BW_GROUP);
388			EVLOCK_UNLOCK(bev->lock, 0);
389		}
390	}
391	return 0;
392}
393
394/** Timer callback invoked on a single bufferevent with one or more exhausted
395    buckets when they are ready to refill. */
396static void
397_bev_refill_callback(evutil_socket_t fd, short what, void *arg)
398{
399	unsigned tick;
400	struct timeval now;
401	struct bufferevent_private *bev = arg;
402	int again = 0;
403	BEV_LOCK(&bev->bev);
404	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
405		BEV_UNLOCK(&bev->bev);
406		return;
407	}
408
409	/* First, update the bucket */
410	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
411	tick = ev_token_bucket_get_tick(&now,
412	    bev->rate_limiting->cfg);
413	ev_token_bucket_update(&bev->rate_limiting->limit,
414	    bev->rate_limiting->cfg,
415	    tick);
416
417	/* Now unsuspend any read/write operations as appropriate. */
418	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
419		if (bev->rate_limiting->limit.read_limit > 0)
420			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
421		else
422			again = 1;
423	}
424	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
425		if (bev->rate_limiting->limit.write_limit > 0)
426			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
427		else
428			again = 1;
429	}
430	if (again) {
431		/* One or more of the buckets may need another refill if they
432		   started negative.
433
434		   XXXX if we need to be quiet for more ticks, we should
435		   maybe figure out what timeout we really want.
436		*/
437		/* XXXX Handle event_add failure somehow */
438		event_add(&bev->rate_limiting->refill_bucket_event,
439		    &bev->rate_limiting->cfg->tick_timeout);
440	}
441	BEV_UNLOCK(&bev->bev);
442}
443
444/** Helper: grab a random element from a bufferevent group. */
445static struct bufferevent_private *
446_bev_group_random_element(struct bufferevent_rate_limit_group *group)
447{
448	int which;
449	struct bufferevent_private *bev;
450
451	/* requires group lock */
452
453	if (!group->n_members)
454		return NULL;
455
456	EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
457
458	which = _evutil_weakrand() % group->n_members;
459
460	bev = TAILQ_FIRST(&group->members);
461	while (which--)
462		bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
463
464	return bev;
465}
466
467/** Iterate over the elements of a rate-limiting group 'g' with a random
468    starting point, assigning each to the variable 'bev', and executing the
469    block 'block'.
470
471    We do this in a half-baked effort to get fairness among group members.
472    XXX Round-robin or some kind of priority queue would be even more fair.
473 */
474#define FOREACH_RANDOM_ORDER(block)			\
475	do {						\
476		first = _bev_group_random_element(g);	\
477		for (bev = first; bev != TAILQ_END(&g->members); \
478		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
479			block ;					 \
480		}						 \
481		for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
482		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
483			block ;						\
484		}							\
485	} while (0)
486
487static void
488_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
489{
490	int again = 0;
491	struct bufferevent_private *bev, *first;
492
493	g->read_suspended = 0;
494	FOREACH_RANDOM_ORDER({
495		if (EVLOCK_TRY_LOCK(bev->lock)) {
496			bufferevent_unsuspend_read(&bev->bev,
497			    BEV_SUSPEND_BW_GROUP);
498			EVLOCK_UNLOCK(bev->lock, 0);
499		} else {
500			again = 1;
501		}
502	});
503	g->pending_unsuspend_read = again;
504}
505
506static void
507_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
508{
509	int again = 0;
510	struct bufferevent_private *bev, *first;
511	g->write_suspended = 0;
512
513	FOREACH_RANDOM_ORDER({
514		if (EVLOCK_TRY_LOCK(bev->lock)) {
515			bufferevent_unsuspend_write(&bev->bev,
516			    BEV_SUSPEND_BW_GROUP);
517			EVLOCK_UNLOCK(bev->lock, 0);
518		} else {
519			again = 1;
520		}
521	});
522	g->pending_unsuspend_write = again;
523}
524
525/** Callback invoked every tick to add more elements to the group bucket
526    and unsuspend group members as needed.
527 */
528static void
529_bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
530{
531	struct bufferevent_rate_limit_group *g = arg;
532	unsigned tick;
533	struct timeval now;
534
535	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536
537	LOCK_GROUP(g);
538
539	tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
540	ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
541
542	if (g->pending_unsuspend_read ||
543	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544		_bev_group_unsuspend_reading(g);
545	}
546	if (g->pending_unsuspend_write ||
547	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548		_bev_group_unsuspend_writing(g);
549	}
550
551	/* XXXX Rather than waiting to the next tick to unsuspend stuff
552	 * with pending_unsuspend_write/read, we should do it on the
553	 * next iteration of the mainloop.
554	 */
555
556	UNLOCK_GROUP(g);
557}
558
559int
560bufferevent_set_rate_limit(struct bufferevent *bev,
561    struct ev_token_bucket_cfg *cfg)
562{
563	struct bufferevent_private *bevp =
564	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
565	int r = -1;
566	struct bufferevent_rate_limit *rlim;
567	struct timeval now;
568	ev_uint32_t tick;
569	int reinit = 0, suspended = 0;
570	/* XXX reference-count cfg */
571
572	BEV_LOCK(bev);
573
574	if (cfg == NULL) {
575		if (bevp->rate_limiting) {
576			rlim = bevp->rate_limiting;
577			rlim->cfg = NULL;
578			bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
579			bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
580			if (event_initialized(&rlim->refill_bucket_event))
581				event_del(&rlim->refill_bucket_event);
582		}
583		r = 0;
584		goto done;
585	}
586
587	event_base_gettimeofday_cached(bev->ev_base, &now);
588	tick = ev_token_bucket_get_tick(&now, cfg);
589
590	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
591		/* no-op */
592		r = 0;
593		goto done;
594	}
595	if (bevp->rate_limiting == NULL) {
596		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
597		if (!rlim)
598			goto done;
599		bevp->rate_limiting = rlim;
600	} else {
601		rlim = bevp->rate_limiting;
602	}
603	reinit = rlim->cfg != NULL;
604
605	rlim->cfg = cfg;
606	ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
607
608	if (reinit) {
609		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
610		event_del(&rlim->refill_bucket_event);
611	}
612	evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
613	    _bev_refill_callback, bevp);
614
615	if (rlim->limit.read_limit > 0) {
616		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
617	} else {
618		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
619		suspended=1;
620	}
621	if (rlim->limit.write_limit > 0) {
622		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
623	} else {
624		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
625		suspended = 1;
626	}
627
628	if (suspended)
629		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
630
631	r = 0;
632
633done:
634	BEV_UNLOCK(bev);
635	return r;
636}
637
638struct bufferevent_rate_limit_group *
639bufferevent_rate_limit_group_new(struct event_base *base,
640    const struct ev_token_bucket_cfg *cfg)
641{
642	struct bufferevent_rate_limit_group *g;
643	struct timeval now;
644	ev_uint32_t tick;
645
646	event_base_gettimeofday_cached(base, &now);
647	tick = ev_token_bucket_get_tick(&now, cfg);
648
649	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
650	if (!g)
651		return NULL;
652	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
653	TAILQ_INIT(&g->members);
654
655	ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
656
657	event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
658	    _bev_group_refill_callback, g);
659	/*XXXX handle event_add failure */
660	event_add(&g->master_refill_event, &cfg->tick_timeout);
661
662	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
663
664	bufferevent_rate_limit_group_set_min_share(g, 64);
665
666	return g;
667}
668
669int
670bufferevent_rate_limit_group_set_cfg(
671	struct bufferevent_rate_limit_group *g,
672	const struct ev_token_bucket_cfg *cfg)
673{
674	int same_tick;
675	if (!g || !cfg)
676		return -1;
677
678	LOCK_GROUP(g);
679	same_tick = evutil_timercmp(
680		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
681	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
682
683	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
684		g->rate_limit.read_limit = cfg->read_maximum;
685	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
686		g->rate_limit.write_limit = cfg->write_maximum;
687
688	if (!same_tick) {
689		/* This can cause a hiccup in the schedule */
690		event_add(&g->master_refill_event, &cfg->tick_timeout);
691	}
692
693	/* The new limits might force us to adjust min_share differently. */
694	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
695
696	UNLOCK_GROUP(g);
697	return 0;
698}
699
700int
701bufferevent_rate_limit_group_set_min_share(
702	struct bufferevent_rate_limit_group *g,
703	size_t share)
704{
705	if (share > EV_SSIZE_MAX)
706		return -1;
707
708	g->configured_min_share = share;
709
710	/* Can't set share to less than the one-tick maximum.  IOW, at steady
711	 * state, at least one connection can go per tick. */
712	if (share > g->rate_limit_cfg.read_rate)
713		share = g->rate_limit_cfg.read_rate;
714	if (share > g->rate_limit_cfg.write_rate)
715		share = g->rate_limit_cfg.write_rate;
716
717	g->min_share = share;
718	return 0;
719}
720
721void
722bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
723{
724	LOCK_GROUP(g);
725	EVUTIL_ASSERT(0 == g->n_members);
726	event_del(&g->master_refill_event);
727	UNLOCK_GROUP(g);
728	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
729	mm_free(g);
730}
731
732int
733bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
734    struct bufferevent_rate_limit_group *g)
735{
736	int wsuspend, rsuspend;
737	struct bufferevent_private *bevp =
738	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
739	BEV_LOCK(bev);
740
741	if (!bevp->rate_limiting) {
742		struct bufferevent_rate_limit *rlim;
743		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
744		if (!rlim) {
745			BEV_UNLOCK(bev);
746			return -1;
747		}
748		evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
749		    _bev_refill_callback, bevp);
750		bevp->rate_limiting = rlim;
751	}
752
753	if (bevp->rate_limiting->group == g) {
754		BEV_UNLOCK(bev);
755		return 0;
756	}
757	if (bevp->rate_limiting->group)
758		bufferevent_remove_from_rate_limit_group(bev);
759
760	LOCK_GROUP(g);
761	bevp->rate_limiting->group = g;
762	++g->n_members;
763	TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
764
765	rsuspend = g->read_suspended;
766	wsuspend = g->write_suspended;
767
768	UNLOCK_GROUP(g);
769
770	if (rsuspend)
771		bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
772	if (wsuspend)
773		bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
774
775	BEV_UNLOCK(bev);
776	return 0;
777}
778
779int
780bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
781{
782	return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
783}
784
785int
786bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
787    int unsuspend)
788{
789	struct bufferevent_private *bevp =
790	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
791	BEV_LOCK(bev);
792	if (bevp->rate_limiting && bevp->rate_limiting->group) {
793		struct bufferevent_rate_limit_group *g =
794		    bevp->rate_limiting->group;
795		LOCK_GROUP(g);
796		bevp->rate_limiting->group = NULL;
797		--g->n_members;
798		TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
799		UNLOCK_GROUP(g);
800	}
801	if (unsuspend) {
802		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
803		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
804	}
805	BEV_UNLOCK(bev);
806	return 0;
807}
808
809/* ===
810 * API functions to expose rate limits.
811 *
812 * Don't use these from inside Libevent; they're meant to be for use by
813 * the program.
814 * === */
815
816/* Mostly you don't want to use this function from inside libevent;
817 * _bufferevent_get_read_max() is more likely what you want*/
818ev_ssize_t
819bufferevent_get_read_limit(struct bufferevent *bev)
820{
821	ev_ssize_t r;
822	struct bufferevent_private *bevp;
823	BEV_LOCK(bev);
824	bevp = BEV_UPCAST(bev);
825	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
826		bufferevent_update_buckets(bevp);
827		r = bevp->rate_limiting->limit.read_limit;
828	} else {
829		r = EV_SSIZE_MAX;
830	}
831	BEV_UNLOCK(bev);
832	return r;
833}
834
835/* Mostly you don't want to use this function from inside libevent;
836 * _bufferevent_get_write_max() is more likely what you want*/
837ev_ssize_t
838bufferevent_get_write_limit(struct bufferevent *bev)
839{
840	ev_ssize_t r;
841	struct bufferevent_private *bevp;
842	BEV_LOCK(bev);
843	bevp = BEV_UPCAST(bev);
844	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
845		bufferevent_update_buckets(bevp);
846		r = bevp->rate_limiting->limit.write_limit;
847	} else {
848		r = EV_SSIZE_MAX;
849	}
850	BEV_UNLOCK(bev);
851	return r;
852}
853
854ev_ssize_t
855bufferevent_get_max_to_read(struct bufferevent *bev)
856{
857	ev_ssize_t r;
858	BEV_LOCK(bev);
859	r = _bufferevent_get_read_max(BEV_UPCAST(bev));
860	BEV_UNLOCK(bev);
861	return r;
862}
863
864ev_ssize_t
865bufferevent_get_max_to_write(struct bufferevent *bev)
866{
867	ev_ssize_t r;
868	BEV_LOCK(bev);
869	r = _bufferevent_get_write_max(BEV_UPCAST(bev));
870	BEV_UNLOCK(bev);
871	return r;
872}
873
874
875/* Mostly you don't want to use this function from inside libevent;
876 * _bufferevent_get_read_max() is more likely what you want*/
877ev_ssize_t
878bufferevent_rate_limit_group_get_read_limit(
879	struct bufferevent_rate_limit_group *grp)
880{
881	ev_ssize_t r;
882	LOCK_GROUP(grp);
883	r = grp->rate_limit.read_limit;
884	UNLOCK_GROUP(grp);
885	return r;
886}
887
888/* Mostly you don't want to use this function from inside libevent;
889 * _bufferevent_get_write_max() is more likely what you want. */
890ev_ssize_t
891bufferevent_rate_limit_group_get_write_limit(
892	struct bufferevent_rate_limit_group *grp)
893{
894	ev_ssize_t r;
895	LOCK_GROUP(grp);
896	r = grp->rate_limit.write_limit;
897	UNLOCK_GROUP(grp);
898	return r;
899}
900
901int
902bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
903{
904	int r = 0;
905	ev_ssize_t old_limit, new_limit;
906	struct bufferevent_private *bevp;
907	BEV_LOCK(bev);
908	bevp = BEV_UPCAST(bev);
909	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
910	old_limit = bevp->rate_limiting->limit.read_limit;
911
912	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
913	if (old_limit > 0 && new_limit <= 0) {
914		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
915		if (event_add(&bevp->rate_limiting->refill_bucket_event,
916			&bevp->rate_limiting->cfg->tick_timeout) < 0)
917			r = -1;
918	} else if (old_limit <= 0 && new_limit > 0) {
919		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
920			event_del(&bevp->rate_limiting->refill_bucket_event);
921		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
922	}
923
924	BEV_UNLOCK(bev);
925	return r;
926}
927
928int
929bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
930{
931	/* XXXX this is mostly copy-and-paste from
932	 * bufferevent_decrement_read_limit */
933	int r = 0;
934	ev_ssize_t old_limit, new_limit;
935	struct bufferevent_private *bevp;
936	BEV_LOCK(bev);
937	bevp = BEV_UPCAST(bev);
938	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
939	old_limit = bevp->rate_limiting->limit.write_limit;
940
941	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
942	if (old_limit > 0 && new_limit <= 0) {
943		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
944		if (event_add(&bevp->rate_limiting->refill_bucket_event,
945			&bevp->rate_limiting->cfg->tick_timeout) < 0)
946			r = -1;
947	} else if (old_limit <= 0 && new_limit > 0) {
948		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
949			event_del(&bevp->rate_limiting->refill_bucket_event);
950		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
951	}
952
953	BEV_UNLOCK(bev);
954	return r;
955}
956
957int
958bufferevent_rate_limit_group_decrement_read(
959	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
960{
961	int r = 0;
962	ev_ssize_t old_limit, new_limit;
963	LOCK_GROUP(grp);
964	old_limit = grp->rate_limit.read_limit;
965	new_limit = (grp->rate_limit.read_limit -= decr);
966
967	if (old_limit > 0 && new_limit <= 0) {
968		_bev_group_suspend_reading(grp);
969	} else if (old_limit <= 0 && new_limit > 0) {
970		_bev_group_unsuspend_reading(grp);
971	}
972
973	UNLOCK_GROUP(grp);
974	return r;
975}
976
977int
978bufferevent_rate_limit_group_decrement_write(
979	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
980{
981	int r = 0;
982	ev_ssize_t old_limit, new_limit;
983	LOCK_GROUP(grp);
984	old_limit = grp->rate_limit.write_limit;
985	new_limit = (grp->rate_limit.write_limit -= decr);
986
987	if (old_limit > 0 && new_limit <= 0) {
988		_bev_group_suspend_writing(grp);
989	} else if (old_limit <= 0 && new_limit > 0) {
990		_bev_group_unsuspend_writing(grp);
991	}
992
993	UNLOCK_GROUP(grp);
994	return r;
995}
996
997void
998bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
999    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1000{
1001	EVUTIL_ASSERT(grp != NULL);
1002	if (total_read_out)
1003		*total_read_out = grp->total_read;
1004	if (total_written_out)
1005		*total_written_out = grp->total_written;
1006}
1007
1008void
1009bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1010{
1011	grp->total_read = grp->total_written = 0;
1012}
1013