1/*
2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 *    derived from this software without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include <sys/types.h>
30#include <limits.h>
31#include <string.h>
32#include <stdlib.h>
33
34#include "event2/event.h"
35#include "event2/event_struct.h"
36#include "event2/util.h"
37#include "event2/bufferevent.h"
38#include "event2/bufferevent_struct.h"
39#include "event2/buffer.h"
40
41#include "ratelim-internal.h"
42
43#include "bufferevent-internal.h"
44#include "mm-internal.h"
45#include "util-internal.h"
46#include "event-internal.h"
47
48int
49ev_token_bucket_init(struct ev_token_bucket *bucket,
50    const struct ev_token_bucket_cfg *cfg,
51    ev_uint32_t current_tick,
52    int reinitialize)
53{
54	if (reinitialize) {
55		/* on reinitialization, we only clip downwards, since we've
56		   already used who-knows-how-much bandwidth this tick.  We
57		   leave "last_updated" as it is; the next update will add the
58		   appropriate amount of bandwidth to the bucket.
59		*/
60		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
61			bucket->read_limit = cfg->read_maximum;
62		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
63			bucket->write_limit = cfg->write_maximum;
64	} else {
65		bucket->read_limit = cfg->read_rate;
66		bucket->write_limit = cfg->write_rate;
67		bucket->last_updated = current_tick;
68	}
69	return 0;
70}
71
72int
73ev_token_bucket_update(struct ev_token_bucket *bucket,
74    const struct ev_token_bucket_cfg *cfg,
75    ev_uint32_t current_tick)
76{
77	/* It's okay if the tick number overflows, since we'll just
78	 * wrap around when we do the unsigned substraction. */
79	unsigned n_ticks = current_tick - bucket->last_updated;
80
81	/* Make sure some ticks actually happened, and that time didn't
82	 * roll back. */
83	if (n_ticks == 0 || n_ticks > INT_MAX)
84		return 0;
85
86	/* Naively, we would say
87		bucket->limit += n_ticks * cfg->rate;
88
89		if (bucket->limit > cfg->maximum)
90			bucket->limit = cfg->maximum;
91
92	   But we're worried about overflow, so we do it like this:
93	*/
94
95	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
96		bucket->read_limit = cfg->read_maximum;
97	else
98		bucket->read_limit += n_ticks * cfg->read_rate;
99
100
101	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
102		bucket->write_limit = cfg->write_maximum;
103	else
104		bucket->write_limit += n_ticks * cfg->write_rate;
105
106
107	bucket->last_updated = current_tick;
108
109	return 1;
110}
111
112static inline void
113bufferevent_update_buckets(struct bufferevent_private *bev)
114{
115	/* Must hold lock on bev. */
116	struct timeval now;
117	unsigned tick;
118	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
119	tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
120	if (tick != bev->rate_limiting->limit.last_updated)
121		ev_token_bucket_update(&bev->rate_limiting->limit,
122		    bev->rate_limiting->cfg, tick);
123}
124
125ev_uint32_t
126ev_token_bucket_get_tick(const struct timeval *tv,
127    const struct ev_token_bucket_cfg *cfg)
128{
129	/* This computation uses two multiplies and a divide.  We could do
130	 * fewer if we knew that the tick length was an integer number of
131	 * seconds, or if we knew it divided evenly into a second.  We should
132	 * investigate that more.
133	 */
134
135	/* We cast to an ev_uint64_t first, since we don't want to overflow
136	 * before we do the final divide. */
137	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
138	return (unsigned)(msec / cfg->msec_per_tick);
139}
140
141struct ev_token_bucket_cfg *
142ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
143    size_t write_rate, size_t write_burst,
144    const struct timeval *tick_len)
145{
146	struct ev_token_bucket_cfg *r;
147	struct timeval g;
148	if (! tick_len) {
149		g.tv_sec = 1;
150		g.tv_usec = 0;
151		tick_len = &g;
152	}
153	if (read_rate > read_burst || write_rate > write_burst ||
154	    read_rate < 1 || write_rate < 1)
155		return NULL;
156	if (read_rate > EV_RATE_LIMIT_MAX ||
157	    write_rate > EV_RATE_LIMIT_MAX ||
158	    read_burst > EV_RATE_LIMIT_MAX ||
159	    write_burst > EV_RATE_LIMIT_MAX)
160		return NULL;
161	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
162	if (!r)
163		return NULL;
164	r->read_rate = read_rate;
165	r->write_rate = write_rate;
166	r->read_maximum = read_burst;
167	r->write_maximum = write_burst;
168	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
169	r->msec_per_tick = (tick_len->tv_sec * 1000) +
170	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
171	return r;
172}
173
174void
175ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
176{
177	mm_free(cfg);
178}
179
180/* No matter how big our bucket gets, don't try to read more than this
181 * much in a single read operation. */
182#define MAX_TO_READ_EVER 16384
183/* No matter how big our bucket gets, don't try to write more than this
184 * much in a single write operation. */
185#define MAX_TO_WRITE_EVER 16384
186
187#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189
190static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
191static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
192static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
193static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
194
195/** Helper: figure out the maximum amount we should write if is_write, or
196    the maximum amount we should read if is_read.  Return that maximum, or
197    0 if our bucket is wholly exhausted.
198 */
199static inline ev_ssize_t
200_bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
201{
202	/* needs lock on bev. */
203	ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
204
205#define LIM(x)						\
206	(is_write ? (x).write_limit : (x).read_limit)
207
208#define GROUP_SUSPENDED(g)			\
209	(is_write ? (g)->write_suspended : (g)->read_suspended)
210
211	/* Sets max_so_far to MIN(x, max_so_far) */
212#define CLAMPTO(x)				\
213	do {					\
214		if (max_so_far > (x))		\
215			max_so_far = (x);	\
216	} while (0);
217
218	if (!bev->rate_limiting)
219		return max_so_far;
220
221	/* If rate-limiting is enabled at all, update the appropriate
222	   bucket, and take the smaller of our rate limit and the group
223	   rate limit.
224	 */
225
226	if (bev->rate_limiting->cfg) {
227		bufferevent_update_buckets(bev);
228		max_so_far = LIM(bev->rate_limiting->limit);
229	}
230	if (bev->rate_limiting->group) {
231		struct bufferevent_rate_limit_group *g =
232		    bev->rate_limiting->group;
233		ev_ssize_t share;
234		LOCK_GROUP(g);
235		if (GROUP_SUSPENDED(g)) {
236			/* We can get here if we failed to lock this
237			 * particular bufferevent while suspending the whole
238			 * group. */
239			if (is_write)
240				bufferevent_suspend_write(&bev->bev,
241				    BEV_SUSPEND_BW_GROUP);
242			else
243				bufferevent_suspend_read(&bev->bev,
244				    BEV_SUSPEND_BW_GROUP);
245			share = 0;
246		} else {
247			/* XXXX probably we should divide among the active
248			 * members, not the total members. */
249			share = LIM(g->rate_limit) / g->n_members;
250			if (share < g->min_share)
251				share = g->min_share;
252		}
253		UNLOCK_GROUP(g);
254		CLAMPTO(share);
255	}
256
257	if (max_so_far < 0)
258		max_so_far = 0;
259	return max_so_far;
260}
261
262ev_ssize_t
263_bufferevent_get_read_max(struct bufferevent_private *bev)
264{
265	return _bufferevent_get_rlim_max(bev, 0);
266}
267
268ev_ssize_t
269_bufferevent_get_write_max(struct bufferevent_private *bev)
270{
271	return _bufferevent_get_rlim_max(bev, 1);
272}
273
274int
275_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
276{
277	/* XXXXX Make sure all users of this function check its return value */
278	int r = 0;
279	/* need to hold lock on bev */
280	if (!bev->rate_limiting)
281		return 0;
282
283	if (bev->rate_limiting->cfg) {
284		bev->rate_limiting->limit.read_limit -= bytes;
285		if (bev->rate_limiting->limit.read_limit <= 0) {
286			bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
287			if (event_add(&bev->rate_limiting->refill_bucket_event,
288				&bev->rate_limiting->cfg->tick_timeout) < 0)
289				r = -1;
290		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
291			if (!(bev->write_suspended & BEV_SUSPEND_BW))
292				event_del(&bev->rate_limiting->refill_bucket_event);
293			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
294		}
295	}
296
297	if (bev->rate_limiting->group) {
298		LOCK_GROUP(bev->rate_limiting->group);
299		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300		bev->rate_limiting->group->total_read += bytes;
301		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302			_bev_group_suspend_reading(bev->rate_limiting->group);
303		} else if (bev->rate_limiting->group->read_suspended) {
304			_bev_group_unsuspend_reading(bev->rate_limiting->group);
305		}
306		UNLOCK_GROUP(bev->rate_limiting->group);
307	}
308
309	return r;
310}
311
312int
313_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
314{
315	/* XXXXX Make sure all users of this function check its return value */
316	int r = 0;
317	/* need to hold lock */
318	if (!bev->rate_limiting)
319		return 0;
320
321	if (bev->rate_limiting->cfg) {
322		bev->rate_limiting->limit.write_limit -= bytes;
323		if (bev->rate_limiting->limit.write_limit <= 0) {
324			bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
325			if (event_add(&bev->rate_limiting->refill_bucket_event,
326				&bev->rate_limiting->cfg->tick_timeout) < 0)
327				r = -1;
328		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
329			if (!(bev->read_suspended & BEV_SUSPEND_BW))
330				event_del(&bev->rate_limiting->refill_bucket_event);
331			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
332		}
333	}
334
335	if (bev->rate_limiting->group) {
336		LOCK_GROUP(bev->rate_limiting->group);
337		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338		bev->rate_limiting->group->total_written += bytes;
339		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340			_bev_group_suspend_writing(bev->rate_limiting->group);
341		} else if (bev->rate_limiting->group->write_suspended) {
342			_bev_group_unsuspend_writing(bev->rate_limiting->group);
343		}
344		UNLOCK_GROUP(bev->rate_limiting->group);
345	}
346
347	return r;
348}
349
350/** Stop reading on every bufferevent in <b>g</b> */
351static int
352_bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
353{
354	/* Needs group lock */
355	struct bufferevent_private *bev;
356	g->read_suspended = 1;
357	g->pending_unsuspend_read = 0;
358
359	/* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
360	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
361	   the bufferevent locks.  If we are unable to lock any individual
362	   bufferevent, it will find out later when it looks at its limit
363	   and sees that its group is suspended.
364	*/
365	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366		if (EVLOCK_TRY_LOCK(bev->lock)) {
367			bufferevent_suspend_read(&bev->bev,
368			    BEV_SUSPEND_BW_GROUP);
369			EVLOCK_UNLOCK(bev->lock, 0);
370		}
371	}
372	return 0;
373}
374
375/** Stop writing on every bufferevent in <b>g</b> */
376static int
377_bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
378{
379	/* Needs group lock */
380	struct bufferevent_private *bev;
381	g->write_suspended = 1;
382	g->pending_unsuspend_write = 0;
383	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384		if (EVLOCK_TRY_LOCK(bev->lock)) {
385			bufferevent_suspend_write(&bev->bev,
386			    BEV_SUSPEND_BW_GROUP);
387			EVLOCK_UNLOCK(bev->lock, 0);
388		}
389	}
390	return 0;
391}
392
393/** Timer callback invoked on a single bufferevent with one or more exhausted
394    buckets when they are ready to refill. */
395static void
396_bev_refill_callback(evutil_socket_t fd, short what, void *arg)
397{
398	unsigned tick;
399	struct timeval now;
400	struct bufferevent_private *bev = arg;
401	int again = 0;
402	BEV_LOCK(&bev->bev);
403	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404		BEV_UNLOCK(&bev->bev);
405		return;
406	}
407
408	/* First, update the bucket */
409	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410	tick = ev_token_bucket_get_tick(&now,
411	    bev->rate_limiting->cfg);
412	ev_token_bucket_update(&bev->rate_limiting->limit,
413	    bev->rate_limiting->cfg,
414	    tick);
415
416	/* Now unsuspend any read/write operations as appropriate. */
417	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418		if (bev->rate_limiting->limit.read_limit > 0)
419			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
420		else
421			again = 1;
422	}
423	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424		if (bev->rate_limiting->limit.write_limit > 0)
425			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
426		else
427			again = 1;
428	}
429	if (again) {
430		/* One or more of the buckets may need another refill if they
431		   started negative.
432
433		   XXXX if we need to be quiet for more ticks, we should
434		   maybe figure out what timeout we really want.
435		*/
436		/* XXXX Handle event_add failure somehow */
437		event_add(&bev->rate_limiting->refill_bucket_event,
438		    &bev->rate_limiting->cfg->tick_timeout);
439	}
440	BEV_UNLOCK(&bev->bev);
441}
442
443/** Helper: grab a random element from a bufferevent group. */
444static struct bufferevent_private *
445_bev_group_random_element(struct bufferevent_rate_limit_group *group)
446{
447	int which;
448	struct bufferevent_private *bev;
449
450	/* requires group lock */
451
452	if (!group->n_members)
453		return NULL;
454
455	EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
456
457	which = _evutil_weakrand() % group->n_members;
458
459	bev = TAILQ_FIRST(&group->members);
460	while (which--)
461		bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
462
463	return bev;
464}
465
466/** Iterate over the elements of a rate-limiting group 'g' with a random
467    starting point, assigning each to the variable 'bev', and executing the
468    block 'block'.
469
470    We do this in a half-baked effort to get fairness among group members.
471    XXX Round-robin or some kind of priority queue would be even more fair.
472 */
473#define FOREACH_RANDOM_ORDER(block)			\
474	do {						\
475		first = _bev_group_random_element(g);	\
476		for (bev = first; bev != TAILQ_END(&g->members); \
477		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
478			block ;					 \
479		}						 \
480		for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
481		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
482			block ;						\
483		}							\
484	} while (0)
485
486static void
487_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
488{
489	int again = 0;
490	struct bufferevent_private *bev, *first;
491
492	g->read_suspended = 0;
493	FOREACH_RANDOM_ORDER({
494		if (EVLOCK_TRY_LOCK(bev->lock)) {
495			bufferevent_unsuspend_read(&bev->bev,
496			    BEV_SUSPEND_BW_GROUP);
497			EVLOCK_UNLOCK(bev->lock, 0);
498		} else {
499			again = 1;
500		}
501	});
502	g->pending_unsuspend_read = again;
503}
504
505static void
506_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
507{
508	int again = 0;
509	struct bufferevent_private *bev, *first;
510	g->write_suspended = 0;
511
512	FOREACH_RANDOM_ORDER({
513		if (EVLOCK_TRY_LOCK(bev->lock)) {
514			bufferevent_unsuspend_write(&bev->bev,
515			    BEV_SUSPEND_BW_GROUP);
516			EVLOCK_UNLOCK(bev->lock, 0);
517		} else {
518			again = 1;
519		}
520	});
521	g->pending_unsuspend_write = again;
522}
523
524/** Callback invoked every tick to add more elements to the group bucket
525    and unsuspend group members as needed.
526 */
527static void
528_bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
529{
530	struct bufferevent_rate_limit_group *g = arg;
531	unsigned tick;
532	struct timeval now;
533
534	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
535
536	LOCK_GROUP(g);
537
538	tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
539	ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
540
541	if (g->pending_unsuspend_read ||
542	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
543		_bev_group_unsuspend_reading(g);
544	}
545	if (g->pending_unsuspend_write ||
546	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
547		_bev_group_unsuspend_writing(g);
548	}
549
550	/* XXXX Rather than waiting to the next tick to unsuspend stuff
551	 * with pending_unsuspend_write/read, we should do it on the
552	 * next iteration of the mainloop.
553	 */
554
555	UNLOCK_GROUP(g);
556}
557
558int
559bufferevent_set_rate_limit(struct bufferevent *bev,
560    struct ev_token_bucket_cfg *cfg)
561{
562	struct bufferevent_private *bevp =
563	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
564	int r = -1;
565	struct bufferevent_rate_limit *rlim;
566	struct timeval now;
567	ev_uint32_t tick;
568	int reinit = 0, suspended = 0;
569	/* XXX reference-count cfg */
570
571	BEV_LOCK(bev);
572
573	if (cfg == NULL) {
574		if (bevp->rate_limiting) {
575			rlim = bevp->rate_limiting;
576			rlim->cfg = NULL;
577			bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
578			bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
579			if (event_initialized(&rlim->refill_bucket_event))
580				event_del(&rlim->refill_bucket_event);
581		}
582		r = 0;
583		goto done;
584	}
585
586	event_base_gettimeofday_cached(bev->ev_base, &now);
587	tick = ev_token_bucket_get_tick(&now, cfg);
588
589	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
590		/* no-op */
591		r = 0;
592		goto done;
593	}
594	if (bevp->rate_limiting == NULL) {
595		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
596		if (!rlim)
597			goto done;
598		bevp->rate_limiting = rlim;
599	} else {
600		rlim = bevp->rate_limiting;
601	}
602	reinit = rlim->cfg != NULL;
603
604	rlim->cfg = cfg;
605	ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
606
607	if (reinit) {
608		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
609		event_del(&rlim->refill_bucket_event);
610	}
611	evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
612	    _bev_refill_callback, bevp);
613
614	if (rlim->limit.read_limit > 0) {
615		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
616	} else {
617		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
618		suspended=1;
619	}
620	if (rlim->limit.write_limit > 0) {
621		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
622	} else {
623		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
624		suspended = 1;
625	}
626
627	if (suspended)
628		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
629
630	r = 0;
631
632done:
633	BEV_UNLOCK(bev);
634	return r;
635}
636
637struct bufferevent_rate_limit_group *
638bufferevent_rate_limit_group_new(struct event_base *base,
639    const struct ev_token_bucket_cfg *cfg)
640{
641	struct bufferevent_rate_limit_group *g;
642	struct timeval now;
643	ev_uint32_t tick;
644
645	event_base_gettimeofday_cached(base, &now);
646	tick = ev_token_bucket_get_tick(&now, cfg);
647
648	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
649	if (!g)
650		return NULL;
651	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
652	TAILQ_INIT(&g->members);
653
654	ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
655
656	event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
657	    _bev_group_refill_callback, g);
658	/*XXXX handle event_add failure */
659	event_add(&g->master_refill_event, &cfg->tick_timeout);
660
661	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
662
663	bufferevent_rate_limit_group_set_min_share(g, 64);
664
665	return g;
666}
667
668int
669bufferevent_rate_limit_group_set_cfg(
670	struct bufferevent_rate_limit_group *g,
671	const struct ev_token_bucket_cfg *cfg)
672{
673	int same_tick;
674	if (!g || !cfg)
675		return -1;
676
677	LOCK_GROUP(g);
678	same_tick = evutil_timercmp(
679		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
680	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
681
682	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
683		g->rate_limit.read_limit = cfg->read_maximum;
684	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
685		g->rate_limit.write_limit = cfg->write_maximum;
686
687	if (!same_tick) {
688		/* This can cause a hiccup in the schedule */
689		event_add(&g->master_refill_event, &cfg->tick_timeout);
690	}
691
692	/* The new limits might force us to adjust min_share differently. */
693	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
694
695	UNLOCK_GROUP(g);
696	return 0;
697}
698
699int
700bufferevent_rate_limit_group_set_min_share(
701	struct bufferevent_rate_limit_group *g,
702	size_t share)
703{
704	if (share > EV_SSIZE_MAX)
705		return -1;
706
707	g->configured_min_share = share;
708
709	/* Can't set share to less than the one-tick maximum.  IOW, at steady
710	 * state, at least one connection can go per tick. */
711	if (share > g->rate_limit_cfg.read_rate)
712		share = g->rate_limit_cfg.read_rate;
713	if (share > g->rate_limit_cfg.write_rate)
714		share = g->rate_limit_cfg.write_rate;
715
716	g->min_share = share;
717	return 0;
718}
719
720void
721bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
722{
723	LOCK_GROUP(g);
724	EVUTIL_ASSERT(0 == g->n_members);
725	event_del(&g->master_refill_event);
726	UNLOCK_GROUP(g);
727	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
728	mm_free(g);
729}
730
731int
732bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
733    struct bufferevent_rate_limit_group *g)
734{
735	int wsuspend, rsuspend;
736	struct bufferevent_private *bevp =
737	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
738	BEV_LOCK(bev);
739
740	if (!bevp->rate_limiting) {
741		struct bufferevent_rate_limit *rlim;
742		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
743		if (!rlim) {
744			BEV_UNLOCK(bev);
745			return -1;
746		}
747		evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
748		    _bev_refill_callback, bevp);
749		bevp->rate_limiting = rlim;
750	}
751
752	if (bevp->rate_limiting->group == g) {
753		BEV_UNLOCK(bev);
754		return 0;
755	}
756	if (bevp->rate_limiting->group)
757		bufferevent_remove_from_rate_limit_group(bev);
758
759	LOCK_GROUP(g);
760	bevp->rate_limiting->group = g;
761	++g->n_members;
762	TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
763
764	rsuspend = g->read_suspended;
765	wsuspend = g->write_suspended;
766
767	UNLOCK_GROUP(g);
768
769	if (rsuspend)
770		bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
771	if (wsuspend)
772		bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
773
774	BEV_UNLOCK(bev);
775	return 0;
776}
777
778int
779bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
780{
781	return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
782}
783
784int
785bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
786    int unsuspend)
787{
788	struct bufferevent_private *bevp =
789	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
790	BEV_LOCK(bev);
791	if (bevp->rate_limiting && bevp->rate_limiting->group) {
792		struct bufferevent_rate_limit_group *g =
793		    bevp->rate_limiting->group;
794		LOCK_GROUP(g);
795		bevp->rate_limiting->group = NULL;
796		--g->n_members;
797		TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
798		UNLOCK_GROUP(g);
799	}
800	if (unsuspend) {
801		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
802		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
803	}
804	BEV_UNLOCK(bev);
805	return 0;
806}
807
808/* ===
809 * API functions to expose rate limits.
810 *
811 * Don't use these from inside Libevent; they're meant to be for use by
812 * the program.
813 * === */
814
815/* Mostly you don't want to use this function from inside libevent;
816 * _bufferevent_get_read_max() is more likely what you want*/
817ev_ssize_t
818bufferevent_get_read_limit(struct bufferevent *bev)
819{
820	ev_ssize_t r;
821	struct bufferevent_private *bevp;
822	BEV_LOCK(bev);
823	bevp = BEV_UPCAST(bev);
824	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
825		bufferevent_update_buckets(bevp);
826		r = bevp->rate_limiting->limit.read_limit;
827	} else {
828		r = EV_SSIZE_MAX;
829	}
830	BEV_UNLOCK(bev);
831	return r;
832}
833
834/* Mostly you don't want to use this function from inside libevent;
835 * _bufferevent_get_write_max() is more likely what you want*/
836ev_ssize_t
837bufferevent_get_write_limit(struct bufferevent *bev)
838{
839	ev_ssize_t r;
840	struct bufferevent_private *bevp;
841	BEV_LOCK(bev);
842	bevp = BEV_UPCAST(bev);
843	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
844		bufferevent_update_buckets(bevp);
845		r = bevp->rate_limiting->limit.write_limit;
846	} else {
847		r = EV_SSIZE_MAX;
848	}
849	BEV_UNLOCK(bev);
850	return r;
851}
852
853ev_ssize_t
854bufferevent_get_max_to_read(struct bufferevent *bev)
855{
856	ev_ssize_t r;
857	BEV_LOCK(bev);
858	r = _bufferevent_get_read_max(BEV_UPCAST(bev));
859	BEV_UNLOCK(bev);
860	return r;
861}
862
863ev_ssize_t
864bufferevent_get_max_to_write(struct bufferevent *bev)
865{
866	ev_ssize_t r;
867	BEV_LOCK(bev);
868	r = _bufferevent_get_write_max(BEV_UPCAST(bev));
869	BEV_UNLOCK(bev);
870	return r;
871}
872
873
874/* Mostly you don't want to use this function from inside libevent;
875 * _bufferevent_get_read_max() is more likely what you want*/
876ev_ssize_t
877bufferevent_rate_limit_group_get_read_limit(
878	struct bufferevent_rate_limit_group *grp)
879{
880	ev_ssize_t r;
881	LOCK_GROUP(grp);
882	r = grp->rate_limit.read_limit;
883	UNLOCK_GROUP(grp);
884	return r;
885}
886
887/* Mostly you don't want to use this function from inside libevent;
888 * _bufferevent_get_write_max() is more likely what you want. */
889ev_ssize_t
890bufferevent_rate_limit_group_get_write_limit(
891	struct bufferevent_rate_limit_group *grp)
892{
893	ev_ssize_t r;
894	LOCK_GROUP(grp);
895	r = grp->rate_limit.write_limit;
896	UNLOCK_GROUP(grp);
897	return r;
898}
899
900int
901bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
902{
903	int r = 0;
904	ev_ssize_t old_limit, new_limit;
905	struct bufferevent_private *bevp;
906	BEV_LOCK(bev);
907	bevp = BEV_UPCAST(bev);
908	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
909	old_limit = bevp->rate_limiting->limit.read_limit;
910
911	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
912	if (old_limit > 0 && new_limit <= 0) {
913		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
914		if (event_add(&bevp->rate_limiting->refill_bucket_event,
915			&bevp->rate_limiting->cfg->tick_timeout) < 0)
916			r = -1;
917	} else if (old_limit <= 0 && new_limit > 0) {
918		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
919			event_del(&bevp->rate_limiting->refill_bucket_event);
920		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
921	}
922
923	BEV_UNLOCK(bev);
924	return r;
925}
926
927int
928bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
929{
930	/* XXXX this is mostly copy-and-paste from
931	 * bufferevent_decrement_read_limit */
932	int r = 0;
933	ev_ssize_t old_limit, new_limit;
934	struct bufferevent_private *bevp;
935	BEV_LOCK(bev);
936	bevp = BEV_UPCAST(bev);
937	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
938	old_limit = bevp->rate_limiting->limit.write_limit;
939
940	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
941	if (old_limit > 0 && new_limit <= 0) {
942		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
943		if (event_add(&bevp->rate_limiting->refill_bucket_event,
944			&bevp->rate_limiting->cfg->tick_timeout) < 0)
945			r = -1;
946	} else if (old_limit <= 0 && new_limit > 0) {
947		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
948			event_del(&bevp->rate_limiting->refill_bucket_event);
949		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
950	}
951
952	BEV_UNLOCK(bev);
953	return r;
954}
955
956int
957bufferevent_rate_limit_group_decrement_read(
958	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
959{
960	int r = 0;
961	ev_ssize_t old_limit, new_limit;
962	LOCK_GROUP(grp);
963	old_limit = grp->rate_limit.read_limit;
964	new_limit = (grp->rate_limit.read_limit -= decr);
965
966	if (old_limit > 0 && new_limit <= 0) {
967		_bev_group_suspend_reading(grp);
968	} else if (old_limit <= 0 && new_limit > 0) {
969		_bev_group_unsuspend_reading(grp);
970	}
971
972	UNLOCK_GROUP(grp);
973	return r;
974}
975
976int
977bufferevent_rate_limit_group_decrement_write(
978	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
979{
980	int r = 0;
981	ev_ssize_t old_limit, new_limit;
982	LOCK_GROUP(grp);
983	old_limit = grp->rate_limit.write_limit;
984	new_limit = (grp->rate_limit.write_limit -= decr);
985
986	if (old_limit > 0 && new_limit <= 0) {
987		_bev_group_suspend_writing(grp);
988	} else if (old_limit <= 0 && new_limit > 0) {
989		_bev_group_unsuspend_writing(grp);
990	}
991
992	UNLOCK_GROUP(grp);
993	return r;
994}
995
996void
997bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
998    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
999{
1000	EVUTIL_ASSERT(grp != NULL);
1001	if (total_read_out)
1002		*total_read_out = grp->total_read;
1003	if (total_written_out)
1004		*total_written_out = grp->total_written;
1005}
1006
1007void
1008bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1009{
1010	grp->total_read = grp->total_written = 0;
1011}
1012