1275970Scy/*
2275970Scy * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3275970Scy * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4275970Scy * All rights reserved.
5275970Scy *
6275970Scy * Redistribution and use in source and binary forms, with or without
7275970Scy * modification, are permitted provided that the following conditions
8275970Scy * are met:
9275970Scy * 1. Redistributions of source code must retain the above copyright
10275970Scy *    notice, this list of conditions and the following disclaimer.
11275970Scy * 2. Redistributions in binary form must reproduce the above copyright
12275970Scy *    notice, this list of conditions and the following disclaimer in the
13275970Scy *    documentation and/or other materials provided with the distribution.
14275970Scy * 3. The name of the author may not be used to endorse or promote products
15275970Scy *    derived from this software without specific prior written permission.
16275970Scy *
17275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27275970Scy */
28275970Scy#include "evconfig-private.h"
29275970Scy
30275970Scy#include <sys/types.h>
31275970Scy#include <limits.h>
32275970Scy#include <string.h>
33275970Scy#include <stdlib.h>
34275970Scy
35275970Scy#include "event2/event.h"
36275970Scy#include "event2/event_struct.h"
37275970Scy#include "event2/util.h"
38275970Scy#include "event2/bufferevent.h"
39275970Scy#include "event2/bufferevent_struct.h"
40275970Scy#include "event2/buffer.h"
41275970Scy
42275970Scy#include "ratelim-internal.h"
43275970Scy
44275970Scy#include "bufferevent-internal.h"
45275970Scy#include "mm-internal.h"
46275970Scy#include "util-internal.h"
47275970Scy#include "event-internal.h"
48275970Scy
49275970Scyint
50275970Scyev_token_bucket_init_(struct ev_token_bucket *bucket,
51275970Scy    const struct ev_token_bucket_cfg *cfg,
52275970Scy    ev_uint32_t current_tick,
53275970Scy    int reinitialize)
54275970Scy{
55275970Scy	if (reinitialize) {
56275970Scy		/* on reinitialization, we only clip downwards, since we've
57275970Scy		   already used who-knows-how-much bandwidth this tick.  We
58275970Scy		   leave "last_updated" as it is; the next update will add the
59275970Scy		   appropriate amount of bandwidth to the bucket.
60275970Scy		*/
61275970Scy		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62275970Scy			bucket->read_limit = cfg->read_maximum;
63275970Scy		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64275970Scy			bucket->write_limit = cfg->write_maximum;
65275970Scy	} else {
66275970Scy		bucket->read_limit = cfg->read_rate;
67275970Scy		bucket->write_limit = cfg->write_rate;
68275970Scy		bucket->last_updated = current_tick;
69275970Scy	}
70275970Scy	return 0;
71275970Scy}
72275970Scy
73275970Scyint
74275970Scyev_token_bucket_update_(struct ev_token_bucket *bucket,
75275970Scy    const struct ev_token_bucket_cfg *cfg,
76275970Scy    ev_uint32_t current_tick)
77275970Scy{
78275970Scy	/* It's okay if the tick number overflows, since we'll just
79275970Scy	 * wrap around when we do the unsigned substraction. */
80275970Scy	unsigned n_ticks = current_tick - bucket->last_updated;
81275970Scy
82275970Scy	/* Make sure some ticks actually happened, and that time didn't
83275970Scy	 * roll back. */
84275970Scy	if (n_ticks == 0 || n_ticks > INT_MAX)
85275970Scy		return 0;
86275970Scy
87275970Scy	/* Naively, we would say
88275970Scy		bucket->limit += n_ticks * cfg->rate;
89275970Scy
90275970Scy		if (bucket->limit > cfg->maximum)
91275970Scy			bucket->limit = cfg->maximum;
92275970Scy
93275970Scy	   But we're worried about overflow, so we do it like this:
94275970Scy	*/
95275970Scy
96275970Scy	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97275970Scy		bucket->read_limit = cfg->read_maximum;
98275970Scy	else
99275970Scy		bucket->read_limit += n_ticks * cfg->read_rate;
100275970Scy
101275970Scy
102275970Scy	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103275970Scy		bucket->write_limit = cfg->write_maximum;
104275970Scy	else
105275970Scy		bucket->write_limit += n_ticks * cfg->write_rate;
106275970Scy
107275970Scy
108275970Scy	bucket->last_updated = current_tick;
109275970Scy
110275970Scy	return 1;
111275970Scy}
112275970Scy
113275970Scystatic inline void
114275970Scybufferevent_update_buckets(struct bufferevent_private *bev)
115275970Scy{
116275970Scy	/* Must hold lock on bev. */
117275970Scy	struct timeval now;
118275970Scy	unsigned tick;
119275970Scy	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120275970Scy	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121275970Scy	if (tick != bev->rate_limiting->limit.last_updated)
122275970Scy		ev_token_bucket_update_(&bev->rate_limiting->limit,
123275970Scy		    bev->rate_limiting->cfg, tick);
124275970Scy}
125275970Scy
126275970Scyev_uint32_t
127275970Scyev_token_bucket_get_tick_(const struct timeval *tv,
128275970Scy    const struct ev_token_bucket_cfg *cfg)
129275970Scy{
130275970Scy	/* This computation uses two multiplies and a divide.  We could do
131275970Scy	 * fewer if we knew that the tick length was an integer number of
132275970Scy	 * seconds, or if we knew it divided evenly into a second.  We should
133275970Scy	 * investigate that more.
134275970Scy	 */
135275970Scy
136275970Scy	/* We cast to an ev_uint64_t first, since we don't want to overflow
137275970Scy	 * before we do the final divide. */
138275970Scy	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139275970Scy	return (unsigned)(msec / cfg->msec_per_tick);
140275970Scy}
141275970Scy
142275970Scystruct ev_token_bucket_cfg *
143275970Scyev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144275970Scy    size_t write_rate, size_t write_burst,
145275970Scy    const struct timeval *tick_len)
146275970Scy{
147275970Scy	struct ev_token_bucket_cfg *r;
148275970Scy	struct timeval g;
149275970Scy	if (! tick_len) {
150275970Scy		g.tv_sec = 1;
151275970Scy		g.tv_usec = 0;
152275970Scy		tick_len = &g;
153275970Scy	}
154275970Scy	if (read_rate > read_burst || write_rate > write_burst ||
155275970Scy	    read_rate < 1 || write_rate < 1)
156275970Scy		return NULL;
157275970Scy	if (read_rate > EV_RATE_LIMIT_MAX ||
158275970Scy	    write_rate > EV_RATE_LIMIT_MAX ||
159275970Scy	    read_burst > EV_RATE_LIMIT_MAX ||
160275970Scy	    write_burst > EV_RATE_LIMIT_MAX)
161275970Scy		return NULL;
162275970Scy	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163275970Scy	if (!r)
164275970Scy		return NULL;
165275970Scy	r->read_rate = read_rate;
166275970Scy	r->write_rate = write_rate;
167275970Scy	r->read_maximum = read_burst;
168275970Scy	r->write_maximum = write_burst;
169275970Scy	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170275970Scy	r->msec_per_tick = (tick_len->tv_sec * 1000) +
171275970Scy	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172275970Scy	return r;
173275970Scy}
174275970Scy
175275970Scyvoid
176275970Scyev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177275970Scy{
178275970Scy	mm_free(cfg);
179275970Scy}
180275970Scy
181275970Scy/* Default values for max_single_read & max_single_write variables. */
182275970Scy#define MAX_SINGLE_READ_DEFAULT 16384
183275970Scy#define MAX_SINGLE_WRITE_DEFAULT 16384
184275970Scy
185275970Scy#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186275970Scy#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187275970Scy
188275970Scystatic int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189275970Scystatic int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190275970Scystatic void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191275970Scystatic void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192275970Scy
193275970Scy/** Helper: figure out the maximum amount we should write if is_write, or
194275970Scy    the maximum amount we should read if is_read.  Return that maximum, or
195275970Scy    0 if our bucket is wholly exhausted.
196275970Scy */
197275970Scystatic inline ev_ssize_t
198275970Scybufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199275970Scy{
200275970Scy	/* needs lock on bev. */
201275970Scy	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202275970Scy
203275970Scy#define LIM(x)						\
204275970Scy	(is_write ? (x).write_limit : (x).read_limit)
205275970Scy
206275970Scy#define GROUP_SUSPENDED(g)			\
207275970Scy	(is_write ? (g)->write_suspended : (g)->read_suspended)
208275970Scy
209275970Scy	/* Sets max_so_far to MIN(x, max_so_far) */
210275970Scy#define CLAMPTO(x)				\
211275970Scy	do {					\
212275970Scy		if (max_so_far > (x))		\
213275970Scy			max_so_far = (x);	\
214275970Scy	} while (0);
215275970Scy
216275970Scy	if (!bev->rate_limiting)
217275970Scy		return max_so_far;
218275970Scy
219275970Scy	/* If rate-limiting is enabled at all, update the appropriate
220275970Scy	   bucket, and take the smaller of our rate limit and the group
221275970Scy	   rate limit.
222275970Scy	 */
223275970Scy
224275970Scy	if (bev->rate_limiting->cfg) {
225275970Scy		bufferevent_update_buckets(bev);
226275970Scy		max_so_far = LIM(bev->rate_limiting->limit);
227275970Scy	}
228275970Scy	if (bev->rate_limiting->group) {
229275970Scy		struct bufferevent_rate_limit_group *g =
230275970Scy		    bev->rate_limiting->group;
231275970Scy		ev_ssize_t share;
232275970Scy		LOCK_GROUP(g);
233275970Scy		if (GROUP_SUSPENDED(g)) {
234275970Scy			/* We can get here if we failed to lock this
235275970Scy			 * particular bufferevent while suspending the whole
236275970Scy			 * group. */
237275970Scy			if (is_write)
238275970Scy				bufferevent_suspend_write_(&bev->bev,
239275970Scy				    BEV_SUSPEND_BW_GROUP);
240275970Scy			else
241275970Scy				bufferevent_suspend_read_(&bev->bev,
242275970Scy				    BEV_SUSPEND_BW_GROUP);
243275970Scy			share = 0;
244275970Scy		} else {
245275970Scy			/* XXXX probably we should divide among the active
246275970Scy			 * members, not the total members. */
247275970Scy			share = LIM(g->rate_limit) / g->n_members;
248275970Scy			if (share < g->min_share)
249275970Scy				share = g->min_share;
250275970Scy		}
251275970Scy		UNLOCK_GROUP(g);
252275970Scy		CLAMPTO(share);
253275970Scy	}
254275970Scy
255275970Scy	if (max_so_far < 0)
256275970Scy		max_so_far = 0;
257275970Scy	return max_so_far;
258275970Scy}
259275970Scy
260275970Scyev_ssize_t
261275970Scybufferevent_get_read_max_(struct bufferevent_private *bev)
262275970Scy{
263275970Scy	return bufferevent_get_rlim_max_(bev, 0);
264275970Scy}
265275970Scy
266275970Scyev_ssize_t
267275970Scybufferevent_get_write_max_(struct bufferevent_private *bev)
268275970Scy{
269275970Scy	return bufferevent_get_rlim_max_(bev, 1);
270275970Scy}
271275970Scy
272275970Scyint
273275970Scybufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274275970Scy{
275275970Scy	/* XXXXX Make sure all users of this function check its return value */
276275970Scy	int r = 0;
277275970Scy	/* need to hold lock on bev */
278275970Scy	if (!bev->rate_limiting)
279275970Scy		return 0;
280275970Scy
281275970Scy	if (bev->rate_limiting->cfg) {
282275970Scy		bev->rate_limiting->limit.read_limit -= bytes;
283275970Scy		if (bev->rate_limiting->limit.read_limit <= 0) {
284275970Scy			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285275970Scy			if (event_add(&bev->rate_limiting->refill_bucket_event,
286275970Scy				&bev->rate_limiting->cfg->tick_timeout) < 0)
287275970Scy				r = -1;
288275970Scy		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
289275970Scy			if (!(bev->write_suspended & BEV_SUSPEND_BW))
290275970Scy				event_del(&bev->rate_limiting->refill_bucket_event);
291275970Scy			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292275970Scy		}
293275970Scy	}
294275970Scy
295275970Scy	if (bev->rate_limiting->group) {
296275970Scy		LOCK_GROUP(bev->rate_limiting->group);
297275970Scy		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298275970Scy		bev->rate_limiting->group->total_read += bytes;
299275970Scy		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300275970Scy			bev_group_suspend_reading_(bev->rate_limiting->group);
301275970Scy		} else if (bev->rate_limiting->group->read_suspended) {
302275970Scy			bev_group_unsuspend_reading_(bev->rate_limiting->group);
303275970Scy		}
304275970Scy		UNLOCK_GROUP(bev->rate_limiting->group);
305275970Scy	}
306275970Scy
307275970Scy	return r;
308275970Scy}
309275970Scy
310275970Scyint
311275970Scybufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312275970Scy{
313275970Scy	/* XXXXX Make sure all users of this function check its return value */
314275970Scy	int r = 0;
315275970Scy	/* need to hold lock */
316275970Scy	if (!bev->rate_limiting)
317275970Scy		return 0;
318275970Scy
319275970Scy	if (bev->rate_limiting->cfg) {
320275970Scy		bev->rate_limiting->limit.write_limit -= bytes;
321275970Scy		if (bev->rate_limiting->limit.write_limit <= 0) {
322275970Scy			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323275970Scy			if (event_add(&bev->rate_limiting->refill_bucket_event,
324275970Scy				&bev->rate_limiting->cfg->tick_timeout) < 0)
325275970Scy				r = -1;
326275970Scy		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
327275970Scy			if (!(bev->read_suspended & BEV_SUSPEND_BW))
328275970Scy				event_del(&bev->rate_limiting->refill_bucket_event);
329275970Scy			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330275970Scy		}
331275970Scy	}
332275970Scy
333275970Scy	if (bev->rate_limiting->group) {
334275970Scy		LOCK_GROUP(bev->rate_limiting->group);
335275970Scy		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336275970Scy		bev->rate_limiting->group->total_written += bytes;
337275970Scy		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338275970Scy			bev_group_suspend_writing_(bev->rate_limiting->group);
339275970Scy		} else if (bev->rate_limiting->group->write_suspended) {
340275970Scy			bev_group_unsuspend_writing_(bev->rate_limiting->group);
341275970Scy		}
342275970Scy		UNLOCK_GROUP(bev->rate_limiting->group);
343275970Scy	}
344275970Scy
345275970Scy	return r;
346275970Scy}
347275970Scy
348275970Scy/** Stop reading on every bufferevent in <b>g</b> */
349275970Scystatic int
350275970Scybev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351275970Scy{
352275970Scy	/* Needs group lock */
353275970Scy	struct bufferevent_private *bev;
354275970Scy	g->read_suspended = 1;
355275970Scy	g->pending_unsuspend_read = 0;
356275970Scy
357275970Scy	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358275970Scy	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
359275970Scy	   the bufferevent locks.  If we are unable to lock any individual
360275970Scy	   bufferevent, it will find out later when it looks at its limit
361275970Scy	   and sees that its group is suspended.)
362275970Scy	*/
363275970Scy	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364275970Scy		if (EVLOCK_TRY_LOCK_(bev->lock)) {
365275970Scy			bufferevent_suspend_read_(&bev->bev,
366275970Scy			    BEV_SUSPEND_BW_GROUP);
367275970Scy			EVLOCK_UNLOCK(bev->lock, 0);
368275970Scy		}
369275970Scy	}
370275970Scy	return 0;
371275970Scy}
372275970Scy
373275970Scy/** Stop writing on every bufferevent in <b>g</b> */
374275970Scystatic int
375275970Scybev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376275970Scy{
377275970Scy	/* Needs group lock */
378275970Scy	struct bufferevent_private *bev;
379275970Scy	g->write_suspended = 1;
380275970Scy	g->pending_unsuspend_write = 0;
381275970Scy	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382275970Scy		if (EVLOCK_TRY_LOCK_(bev->lock)) {
383275970Scy			bufferevent_suspend_write_(&bev->bev,
384275970Scy			    BEV_SUSPEND_BW_GROUP);
385275970Scy			EVLOCK_UNLOCK(bev->lock, 0);
386275970Scy		}
387275970Scy	}
388275970Scy	return 0;
389275970Scy}
390275970Scy
391275970Scy/** Timer callback invoked on a single bufferevent with one or more exhausted
392275970Scy    buckets when they are ready to refill. */
393275970Scystatic void
394275970Scybev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395275970Scy{
396275970Scy	unsigned tick;
397275970Scy	struct timeval now;
398275970Scy	struct bufferevent_private *bev = arg;
399275970Scy	int again = 0;
400275970Scy	BEV_LOCK(&bev->bev);
401275970Scy	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402275970Scy		BEV_UNLOCK(&bev->bev);
403275970Scy		return;
404275970Scy	}
405275970Scy
406275970Scy	/* First, update the bucket */
407275970Scy	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408275970Scy	tick = ev_token_bucket_get_tick_(&now,
409275970Scy	    bev->rate_limiting->cfg);
410275970Scy	ev_token_bucket_update_(&bev->rate_limiting->limit,
411275970Scy	    bev->rate_limiting->cfg,
412275970Scy	    tick);
413275970Scy
414275970Scy	/* Now unsuspend any read/write operations as appropriate. */
415275970Scy	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416275970Scy		if (bev->rate_limiting->limit.read_limit > 0)
417275970Scy			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418275970Scy		else
419275970Scy			again = 1;
420275970Scy	}
421275970Scy	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422275970Scy		if (bev->rate_limiting->limit.write_limit > 0)
423275970Scy			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424275970Scy		else
425275970Scy			again = 1;
426275970Scy	}
427275970Scy	if (again) {
428275970Scy		/* One or more of the buckets may need another refill if they
429275970Scy		   started negative.
430275970Scy
431275970Scy		   XXXX if we need to be quiet for more ticks, we should
432275970Scy		   maybe figure out what timeout we really want.
433275970Scy		*/
434275970Scy		/* XXXX Handle event_add failure somehow */
435275970Scy		event_add(&bev->rate_limiting->refill_bucket_event,
436275970Scy		    &bev->rate_limiting->cfg->tick_timeout);
437275970Scy	}
438275970Scy	BEV_UNLOCK(&bev->bev);
439275970Scy}
440275970Scy
441275970Scy/** Helper: grab a random element from a bufferevent group.
442275970Scy *
443275970Scy * Requires that we hold the lock on the group.
444275970Scy */
445275970Scystatic struct bufferevent_private *
446275970Scybev_group_random_element_(struct bufferevent_rate_limit_group *group)
447275970Scy{
448275970Scy	int which;
449275970Scy	struct bufferevent_private *bev;
450275970Scy
451275970Scy	/* requires group lock */
452275970Scy
453275970Scy	if (!group->n_members)
454275970Scy		return NULL;
455275970Scy
456275970Scy	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457275970Scy
458275970Scy	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459275970Scy
460275970Scy	bev = LIST_FIRST(&group->members);
461275970Scy	while (which--)
462275970Scy		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463275970Scy
464275970Scy	return bev;
465275970Scy}
466275970Scy
467275970Scy/** Iterate over the elements of a rate-limiting group 'g' with a random
468275970Scy    starting point, assigning each to the variable 'bev', and executing the
469275970Scy    block 'block'.
470275970Scy
471275970Scy    We do this in a half-baked effort to get fairness among group members.
472275970Scy    XXX Round-robin or some kind of priority queue would be even more fair.
473275970Scy */
474275970Scy#define FOREACH_RANDOM_ORDER(block)			\
475275970Scy	do {						\
476275970Scy		first = bev_group_random_element_(g);	\
477275970Scy		for (bev = first; bev != LIST_END(&g->members); \
478275970Scy		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479275970Scy			block ;					 \
480275970Scy		}						 \
481275970Scy		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482275970Scy		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483275970Scy			block ;						\
484275970Scy		}							\
485275970Scy	} while (0)
486275970Scy
487275970Scystatic void
488275970Scybev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489275970Scy{
490275970Scy	int again = 0;
491275970Scy	struct bufferevent_private *bev, *first;
492275970Scy
493275970Scy	g->read_suspended = 0;
494275970Scy	FOREACH_RANDOM_ORDER({
495275970Scy		if (EVLOCK_TRY_LOCK_(bev->lock)) {
496275970Scy			bufferevent_unsuspend_read_(&bev->bev,
497275970Scy			    BEV_SUSPEND_BW_GROUP);
498275970Scy			EVLOCK_UNLOCK(bev->lock, 0);
499275970Scy		} else {
500275970Scy			again = 1;
501275970Scy		}
502275970Scy	});
503275970Scy	g->pending_unsuspend_read = again;
504275970Scy}
505275970Scy
506275970Scystatic void
507275970Scybev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508275970Scy{
509275970Scy	int again = 0;
510275970Scy	struct bufferevent_private *bev, *first;
511275970Scy	g->write_suspended = 0;
512275970Scy
513275970Scy	FOREACH_RANDOM_ORDER({
514275970Scy		if (EVLOCK_TRY_LOCK_(bev->lock)) {
515275970Scy			bufferevent_unsuspend_write_(&bev->bev,
516275970Scy			    BEV_SUSPEND_BW_GROUP);
517275970Scy			EVLOCK_UNLOCK(bev->lock, 0);
518275970Scy		} else {
519275970Scy			again = 1;
520275970Scy		}
521275970Scy	});
522275970Scy	g->pending_unsuspend_write = again;
523275970Scy}
524275970Scy
525275970Scy/** Callback invoked every tick to add more elements to the group bucket
526275970Scy    and unsuspend group members as needed.
527275970Scy */
528275970Scystatic void
529275970Scybev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530275970Scy{
531275970Scy	struct bufferevent_rate_limit_group *g = arg;
532275970Scy	unsigned tick;
533275970Scy	struct timeval now;
534275970Scy
535275970Scy	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536275970Scy
537275970Scy	LOCK_GROUP(g);
538275970Scy
539275970Scy	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540275970Scy	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541275970Scy
542275970Scy	if (g->pending_unsuspend_read ||
543275970Scy	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544275970Scy		bev_group_unsuspend_reading_(g);
545275970Scy	}
546275970Scy	if (g->pending_unsuspend_write ||
547275970Scy	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548275970Scy		bev_group_unsuspend_writing_(g);
549275970Scy	}
550275970Scy
551275970Scy	/* XXXX Rather than waiting to the next tick to unsuspend stuff
552275970Scy	 * with pending_unsuspend_write/read, we should do it on the
553275970Scy	 * next iteration of the mainloop.
554275970Scy	 */
555275970Scy
556275970Scy	UNLOCK_GROUP(g);
557275970Scy}
558275970Scy
559275970Scyint
560275970Scybufferevent_set_rate_limit(struct bufferevent *bev,
561275970Scy    struct ev_token_bucket_cfg *cfg)
562275970Scy{
563275970Scy	struct bufferevent_private *bevp =
564275970Scy	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
565275970Scy	int r = -1;
566275970Scy	struct bufferevent_rate_limit *rlim;
567275970Scy	struct timeval now;
568275970Scy	ev_uint32_t tick;
569275970Scy	int reinit = 0, suspended = 0;
570275970Scy	/* XXX reference-count cfg */
571275970Scy
572275970Scy	BEV_LOCK(bev);
573275970Scy
574275970Scy	if (cfg == NULL) {
575275970Scy		if (bevp->rate_limiting) {
576275970Scy			rlim = bevp->rate_limiting;
577275970Scy			rlim->cfg = NULL;
578275970Scy			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
579275970Scy			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
580275970Scy			if (event_initialized(&rlim->refill_bucket_event))
581275970Scy				event_del(&rlim->refill_bucket_event);
582275970Scy		}
583275970Scy		r = 0;
584275970Scy		goto done;
585275970Scy	}
586275970Scy
587275970Scy	event_base_gettimeofday_cached(bev->ev_base, &now);
588275970Scy	tick = ev_token_bucket_get_tick_(&now, cfg);
589275970Scy
590275970Scy	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
591275970Scy		/* no-op */
592275970Scy		r = 0;
593275970Scy		goto done;
594275970Scy	}
595275970Scy	if (bevp->rate_limiting == NULL) {
596275970Scy		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
597275970Scy		if (!rlim)
598275970Scy			goto done;
599275970Scy		bevp->rate_limiting = rlim;
600275970Scy	} else {
601275970Scy		rlim = bevp->rate_limiting;
602275970Scy	}
603275970Scy	reinit = rlim->cfg != NULL;
604275970Scy
605275970Scy	rlim->cfg = cfg;
606275970Scy	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
607275970Scy
608275970Scy	if (reinit) {
609275970Scy		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
610275970Scy		event_del(&rlim->refill_bucket_event);
611275970Scy	}
612275970Scy	event_assign(&rlim->refill_bucket_event, bev->ev_base,
613275970Scy	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
614275970Scy
615275970Scy	if (rlim->limit.read_limit > 0) {
616275970Scy		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
617275970Scy	} else {
618275970Scy		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
619275970Scy		suspended=1;
620275970Scy	}
621275970Scy	if (rlim->limit.write_limit > 0) {
622275970Scy		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
623275970Scy	} else {
624275970Scy		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
625275970Scy		suspended = 1;
626275970Scy	}
627275970Scy
628275970Scy	if (suspended)
629275970Scy		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
630275970Scy
631275970Scy	r = 0;
632275970Scy
633275970Scydone:
634275970Scy	BEV_UNLOCK(bev);
635275970Scy	return r;
636275970Scy}
637275970Scy
638275970Scystruct bufferevent_rate_limit_group *
639275970Scybufferevent_rate_limit_group_new(struct event_base *base,
640275970Scy    const struct ev_token_bucket_cfg *cfg)
641275970Scy{
642275970Scy	struct bufferevent_rate_limit_group *g;
643275970Scy	struct timeval now;
644275970Scy	ev_uint32_t tick;
645275970Scy
646275970Scy	event_base_gettimeofday_cached(base, &now);
647275970Scy	tick = ev_token_bucket_get_tick_(&now, cfg);
648275970Scy
649275970Scy	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
650275970Scy	if (!g)
651275970Scy		return NULL;
652275970Scy	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
653275970Scy	LIST_INIT(&g->members);
654275970Scy
655275970Scy	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
656275970Scy
657275970Scy	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
658275970Scy	    bev_group_refill_callback_, g);
659275970Scy	/*XXXX handle event_add failure */
660275970Scy	event_add(&g->master_refill_event, &cfg->tick_timeout);
661275970Scy
662275970Scy	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
663275970Scy
664275970Scy	bufferevent_rate_limit_group_set_min_share(g, 64);
665275970Scy
666275970Scy	evutil_weakrand_seed_(&g->weakrand_seed,
667275970Scy	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
668275970Scy
669275970Scy	return g;
670275970Scy}
671275970Scy
672275970Scyint
673275970Scybufferevent_rate_limit_group_set_cfg(
674275970Scy	struct bufferevent_rate_limit_group *g,
675275970Scy	const struct ev_token_bucket_cfg *cfg)
676275970Scy{
677275970Scy	int same_tick;
678275970Scy	if (!g || !cfg)
679275970Scy		return -1;
680275970Scy
681275970Scy	LOCK_GROUP(g);
682275970Scy	same_tick = evutil_timercmp(
683275970Scy		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
684275970Scy	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
685275970Scy
686275970Scy	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
687275970Scy		g->rate_limit.read_limit = cfg->read_maximum;
688275970Scy	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
689275970Scy		g->rate_limit.write_limit = cfg->write_maximum;
690275970Scy
691275970Scy	if (!same_tick) {
692275970Scy		/* This can cause a hiccup in the schedule */
693275970Scy		event_add(&g->master_refill_event, &cfg->tick_timeout);
694275970Scy	}
695275970Scy
696275970Scy	/* The new limits might force us to adjust min_share differently. */
697275970Scy	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
698275970Scy
699275970Scy	UNLOCK_GROUP(g);
700275970Scy	return 0;
701275970Scy}
702275970Scy
703275970Scyint
704275970Scybufferevent_rate_limit_group_set_min_share(
705275970Scy	struct bufferevent_rate_limit_group *g,
706275970Scy	size_t share)
707275970Scy{
708275970Scy	if (share > EV_SSIZE_MAX)
709275970Scy		return -1;
710275970Scy
711275970Scy	g->configured_min_share = share;
712275970Scy
713275970Scy	/* Can't set share to less than the one-tick maximum.  IOW, at steady
714275970Scy	 * state, at least one connection can go per tick. */
715275970Scy	if (share > g->rate_limit_cfg.read_rate)
716275970Scy		share = g->rate_limit_cfg.read_rate;
717275970Scy	if (share > g->rate_limit_cfg.write_rate)
718275970Scy		share = g->rate_limit_cfg.write_rate;
719275970Scy
720275970Scy	g->min_share = share;
721275970Scy	return 0;
722275970Scy}
723275970Scy
724275970Scyvoid
725275970Scybufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
726275970Scy{
727275970Scy	LOCK_GROUP(g);
728275970Scy	EVUTIL_ASSERT(0 == g->n_members);
729275970Scy	event_del(&g->master_refill_event);
730275970Scy	UNLOCK_GROUP(g);
731275970Scy	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
732275970Scy	mm_free(g);
733275970Scy}
734275970Scy
735275970Scyint
736275970Scybufferevent_add_to_rate_limit_group(struct bufferevent *bev,
737275970Scy    struct bufferevent_rate_limit_group *g)
738275970Scy{
739275970Scy	int wsuspend, rsuspend;
740275970Scy	struct bufferevent_private *bevp =
741275970Scy	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
742275970Scy	BEV_LOCK(bev);
743275970Scy
744275970Scy	if (!bevp->rate_limiting) {
745275970Scy		struct bufferevent_rate_limit *rlim;
746275970Scy		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
747275970Scy		if (!rlim) {
748275970Scy			BEV_UNLOCK(bev);
749275970Scy			return -1;
750275970Scy		}
751275970Scy		event_assign(&rlim->refill_bucket_event, bev->ev_base,
752275970Scy		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
753275970Scy		bevp->rate_limiting = rlim;
754275970Scy	}
755275970Scy
756275970Scy	if (bevp->rate_limiting->group == g) {
757275970Scy		BEV_UNLOCK(bev);
758275970Scy		return 0;
759275970Scy	}
760275970Scy	if (bevp->rate_limiting->group)
761275970Scy		bufferevent_remove_from_rate_limit_group(bev);
762275970Scy
763275970Scy	LOCK_GROUP(g);
764275970Scy	bevp->rate_limiting->group = g;
765275970Scy	++g->n_members;
766275970Scy	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
767275970Scy
768275970Scy	rsuspend = g->read_suspended;
769275970Scy	wsuspend = g->write_suspended;
770275970Scy
771275970Scy	UNLOCK_GROUP(g);
772275970Scy
773275970Scy	if (rsuspend)
774275970Scy		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
775275970Scy	if (wsuspend)
776275970Scy		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
777275970Scy
778275970Scy	BEV_UNLOCK(bev);
779275970Scy	return 0;
780275970Scy}
781275970Scy
782275970Scyint
783275970Scybufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
784275970Scy{
785275970Scy	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
786275970Scy}
787275970Scy
788275970Scyint
789275970Scybufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790275970Scy    int unsuspend)
791275970Scy{
792275970Scy	struct bufferevent_private *bevp =
793275970Scy	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
794275970Scy	BEV_LOCK(bev);
795275970Scy	if (bevp->rate_limiting && bevp->rate_limiting->group) {
796275970Scy		struct bufferevent_rate_limit_group *g =
797275970Scy		    bevp->rate_limiting->group;
798275970Scy		LOCK_GROUP(g);
799275970Scy		bevp->rate_limiting->group = NULL;
800275970Scy		--g->n_members;
801275970Scy		LIST_REMOVE(bevp, rate_limiting->next_in_group);
802275970Scy		UNLOCK_GROUP(g);
803275970Scy	}
804275970Scy	if (unsuspend) {
805275970Scy		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
806275970Scy		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
807275970Scy	}
808275970Scy	BEV_UNLOCK(bev);
809275970Scy	return 0;
810275970Scy}
811275970Scy
812275970Scy/* ===
813275970Scy * API functions to expose rate limits.
814275970Scy *
815275970Scy * Don't use these from inside Libevent; they're meant to be for use by
816275970Scy * the program.
817275970Scy * === */
818275970Scy
819275970Scy/* Mostly you don't want to use this function from inside libevent;
820275970Scy * bufferevent_get_read_max_() is more likely what you want*/
821275970Scyev_ssize_t
822275970Scybufferevent_get_read_limit(struct bufferevent *bev)
823275970Scy{
824275970Scy	ev_ssize_t r;
825275970Scy	struct bufferevent_private *bevp;
826275970Scy	BEV_LOCK(bev);
827275970Scy	bevp = BEV_UPCAST(bev);
828275970Scy	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
829275970Scy		bufferevent_update_buckets(bevp);
830275970Scy		r = bevp->rate_limiting->limit.read_limit;
831275970Scy	} else {
832275970Scy		r = EV_SSIZE_MAX;
833275970Scy	}
834275970Scy	BEV_UNLOCK(bev);
835275970Scy	return r;
836275970Scy}
837275970Scy
838275970Scy/* Mostly you don't want to use this function from inside libevent;
839275970Scy * bufferevent_get_write_max_() is more likely what you want*/
840275970Scyev_ssize_t
841275970Scybufferevent_get_write_limit(struct bufferevent *bev)
842275970Scy{
843275970Scy	ev_ssize_t r;
844275970Scy	struct bufferevent_private *bevp;
845275970Scy	BEV_LOCK(bev);
846275970Scy	bevp = BEV_UPCAST(bev);
847275970Scy	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
848275970Scy		bufferevent_update_buckets(bevp);
849275970Scy		r = bevp->rate_limiting->limit.write_limit;
850275970Scy	} else {
851275970Scy		r = EV_SSIZE_MAX;
852275970Scy	}
853275970Scy	BEV_UNLOCK(bev);
854275970Scy	return r;
855275970Scy}
856275970Scy
857275970Scyint
858275970Scybufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
859275970Scy{
860275970Scy	struct bufferevent_private *bevp;
861275970Scy	BEV_LOCK(bev);
862275970Scy	bevp = BEV_UPCAST(bev);
863275970Scy	if (size == 0 || size > EV_SSIZE_MAX)
864275970Scy		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
865275970Scy	else
866275970Scy		bevp->max_single_read = size;
867275970Scy	BEV_UNLOCK(bev);
868275970Scy	return 0;
869275970Scy}
870275970Scy
871275970Scyint
872275970Scybufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
873275970Scy{
874275970Scy	struct bufferevent_private *bevp;
875275970Scy	BEV_LOCK(bev);
876275970Scy	bevp = BEV_UPCAST(bev);
877275970Scy	if (size == 0 || size > EV_SSIZE_MAX)
878275970Scy		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
879275970Scy	else
880275970Scy		bevp->max_single_write = size;
881275970Scy	BEV_UNLOCK(bev);
882275970Scy	return 0;
883275970Scy}
884275970Scy
885275970Scyev_ssize_t
886275970Scybufferevent_get_max_single_read(struct bufferevent *bev)
887275970Scy{
888275970Scy	ev_ssize_t r;
889275970Scy
890275970Scy	BEV_LOCK(bev);
891275970Scy	r = BEV_UPCAST(bev)->max_single_read;
892275970Scy	BEV_UNLOCK(bev);
893275970Scy	return r;
894275970Scy}
895275970Scy
896275970Scyev_ssize_t
897275970Scybufferevent_get_max_single_write(struct bufferevent *bev)
898275970Scy{
899275970Scy	ev_ssize_t r;
900275970Scy
901275970Scy	BEV_LOCK(bev);
902275970Scy	r = BEV_UPCAST(bev)->max_single_write;
903275970Scy	BEV_UNLOCK(bev);
904275970Scy	return r;
905275970Scy}
906275970Scy
907275970Scyev_ssize_t
908275970Scybufferevent_get_max_to_read(struct bufferevent *bev)
909275970Scy{
910275970Scy	ev_ssize_t r;
911275970Scy	BEV_LOCK(bev);
912275970Scy	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
913275970Scy	BEV_UNLOCK(bev);
914275970Scy	return r;
915275970Scy}
916275970Scy
917275970Scyev_ssize_t
918275970Scybufferevent_get_max_to_write(struct bufferevent *bev)
919275970Scy{
920275970Scy	ev_ssize_t r;
921275970Scy	BEV_LOCK(bev);
922275970Scy	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
923275970Scy	BEV_UNLOCK(bev);
924275970Scy	return r;
925275970Scy}
926275970Scy
927275970Scyconst struct ev_token_bucket_cfg *
928275970Scybufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
929275970Scy	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
930275970Scy	struct ev_token_bucket_cfg *cfg;
931275970Scy
932275970Scy	BEV_LOCK(bev);
933275970Scy
934275970Scy	if (bufev_private->rate_limiting) {
935275970Scy		cfg = bufev_private->rate_limiting->cfg;
936275970Scy	} else {
937275970Scy		cfg = NULL;
938275970Scy	}
939275970Scy
940275970Scy	BEV_UNLOCK(bev);
941275970Scy
942275970Scy	return cfg;
943275970Scy}
944275970Scy
945275970Scy/* Mostly you don't want to use this function from inside libevent;
946275970Scy * bufferevent_get_read_max_() is more likely what you want*/
947275970Scyev_ssize_t
948275970Scybufferevent_rate_limit_group_get_read_limit(
949275970Scy	struct bufferevent_rate_limit_group *grp)
950275970Scy{
951275970Scy	ev_ssize_t r;
952275970Scy	LOCK_GROUP(grp);
953275970Scy	r = grp->rate_limit.read_limit;
954275970Scy	UNLOCK_GROUP(grp);
955275970Scy	return r;
956275970Scy}
957275970Scy
958275970Scy/* Mostly you don't want to use this function from inside libevent;
959275970Scy * bufferevent_get_write_max_() is more likely what you want. */
960275970Scyev_ssize_t
961275970Scybufferevent_rate_limit_group_get_write_limit(
962275970Scy	struct bufferevent_rate_limit_group *grp)
963275970Scy{
964275970Scy	ev_ssize_t r;
965275970Scy	LOCK_GROUP(grp);
966275970Scy	r = grp->rate_limit.write_limit;
967275970Scy	UNLOCK_GROUP(grp);
968275970Scy	return r;
969275970Scy}
970275970Scy
971275970Scyint
972275970Scybufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
973275970Scy{
974275970Scy	int r = 0;
975275970Scy	ev_ssize_t old_limit, new_limit;
976275970Scy	struct bufferevent_private *bevp;
977275970Scy	BEV_LOCK(bev);
978275970Scy	bevp = BEV_UPCAST(bev);
979275970Scy	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
980275970Scy	old_limit = bevp->rate_limiting->limit.read_limit;
981275970Scy
982275970Scy	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
983275970Scy	if (old_limit > 0 && new_limit <= 0) {
984275970Scy		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
985275970Scy		if (event_add(&bevp->rate_limiting->refill_bucket_event,
986275970Scy			&bevp->rate_limiting->cfg->tick_timeout) < 0)
987275970Scy			r = -1;
988275970Scy	} else if (old_limit <= 0 && new_limit > 0) {
989275970Scy		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
990275970Scy			event_del(&bevp->rate_limiting->refill_bucket_event);
991275970Scy		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
992275970Scy	}
993275970Scy
994275970Scy	BEV_UNLOCK(bev);
995275970Scy	return r;
996275970Scy}
997275970Scy
998275970Scyint
999275970Scybufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1000275970Scy{
1001275970Scy	/* XXXX this is mostly copy-and-paste from
1002275970Scy	 * bufferevent_decrement_read_limit */
1003275970Scy	int r = 0;
1004275970Scy	ev_ssize_t old_limit, new_limit;
1005275970Scy	struct bufferevent_private *bevp;
1006275970Scy	BEV_LOCK(bev);
1007275970Scy	bevp = BEV_UPCAST(bev);
1008275970Scy	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1009275970Scy	old_limit = bevp->rate_limiting->limit.write_limit;
1010275970Scy
1011275970Scy	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1012275970Scy	if (old_limit > 0 && new_limit <= 0) {
1013275970Scy		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1014275970Scy		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1015275970Scy			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1016275970Scy			r = -1;
1017275970Scy	} else if (old_limit <= 0 && new_limit > 0) {
1018275970Scy		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1019275970Scy			event_del(&bevp->rate_limiting->refill_bucket_event);
1020275970Scy		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1021275970Scy	}
1022275970Scy
1023275970Scy	BEV_UNLOCK(bev);
1024275970Scy	return r;
1025275970Scy}
1026275970Scy
1027275970Scyint
1028275970Scybufferevent_rate_limit_group_decrement_read(
1029275970Scy	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1030275970Scy{
1031275970Scy	int r = 0;
1032275970Scy	ev_ssize_t old_limit, new_limit;
1033275970Scy	LOCK_GROUP(grp);
1034275970Scy	old_limit = grp->rate_limit.read_limit;
1035275970Scy	new_limit = (grp->rate_limit.read_limit -= decr);
1036275970Scy
1037275970Scy	if (old_limit > 0 && new_limit <= 0) {
1038275970Scy		bev_group_suspend_reading_(grp);
1039275970Scy	} else if (old_limit <= 0 && new_limit > 0) {
1040275970Scy		bev_group_unsuspend_reading_(grp);
1041275970Scy	}
1042275970Scy
1043275970Scy	UNLOCK_GROUP(grp);
1044275970Scy	return r;
1045275970Scy}
1046275970Scy
1047275970Scyint
1048275970Scybufferevent_rate_limit_group_decrement_write(
1049275970Scy	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1050275970Scy{
1051275970Scy	int r = 0;
1052275970Scy	ev_ssize_t old_limit, new_limit;
1053275970Scy	LOCK_GROUP(grp);
1054275970Scy	old_limit = grp->rate_limit.write_limit;
1055275970Scy	new_limit = (grp->rate_limit.write_limit -= decr);
1056275970Scy
1057275970Scy	if (old_limit > 0 && new_limit <= 0) {
1058275970Scy		bev_group_suspend_writing_(grp);
1059275970Scy	} else if (old_limit <= 0 && new_limit > 0) {
1060275970Scy		bev_group_unsuspend_writing_(grp);
1061275970Scy	}
1062275970Scy
1063275970Scy	UNLOCK_GROUP(grp);
1064275970Scy	return r;
1065275970Scy}
1066275970Scy
1067275970Scyvoid
1068275970Scybufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1069275970Scy    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1070275970Scy{
1071275970Scy	EVUTIL_ASSERT(grp != NULL);
1072275970Scy	if (total_read_out)
1073275970Scy		*total_read_out = grp->total_read;
1074275970Scy	if (total_written_out)
1075275970Scy		*total_written_out = grp->total_written;
1076275970Scy}
1077275970Scy
1078275970Scyvoid
1079275970Scybufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1080275970Scy{
1081275970Scy	grp->total_read = grp->total_written = 0;
1082275970Scy}
1083275970Scy
1084275970Scyint
1085275970Scybufferevent_ratelim_init_(struct bufferevent_private *bev)
1086275970Scy{
1087275970Scy	bev->rate_limiting = NULL;
1088275970Scy	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1089275970Scy	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1090275970Scy
1091275970Scy	return 0;
1092275970Scy}
1093