1290001Sglebius/*
2290001Sglebius * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3290001Sglebius * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4290001Sglebius * All rights reserved.
5290001Sglebius *
6290001Sglebius * Redistribution and use in source and binary forms, with or without
7290001Sglebius * modification, are permitted provided that the following conditions
8290001Sglebius * are met:
9290001Sglebius * 1. Redistributions of source code must retain the above copyright
10290001Sglebius *    notice, this list of conditions and the following disclaimer.
11290001Sglebius * 2. Redistributions in binary form must reproduce the above copyright
12290001Sglebius *    notice, this list of conditions and the following disclaimer in the
13290001Sglebius *    documentation and/or other materials provided with the distribution.
14290001Sglebius * 3. The name of the author may not be used to endorse or promote products
15290001Sglebius *    derived from this software without specific prior written permission.
16290001Sglebius *
17290001Sglebius * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18290001Sglebius * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19290001Sglebius * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20290001Sglebius * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21290001Sglebius * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22290001Sglebius * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23290001Sglebius * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24290001Sglebius * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25290001Sglebius * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26290001Sglebius * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27290001Sglebius */
28290001Sglebius#include "evconfig-private.h"
29290001Sglebius
30290001Sglebius#include <sys/types.h>
31290001Sglebius#include <limits.h>
32290001Sglebius#include <string.h>
33290001Sglebius#include <stdlib.h>
34290001Sglebius
35290001Sglebius#include "event2/event.h"
36290001Sglebius#include "event2/event_struct.h"
37290001Sglebius#include "event2/util.h"
38290001Sglebius#include "event2/bufferevent.h"
39290001Sglebius#include "event2/bufferevent_struct.h"
40290001Sglebius#include "event2/buffer.h"
41290001Sglebius
42290001Sglebius#include "ratelim-internal.h"
43290001Sglebius
44290001Sglebius#include "bufferevent-internal.h"
45290001Sglebius#include "mm-internal.h"
46290001Sglebius#include "util-internal.h"
47290001Sglebius#include "event-internal.h"
48290001Sglebius
49290001Sglebiusint
50290001Sglebiusev_token_bucket_init_(struct ev_token_bucket *bucket,
51290001Sglebius    const struct ev_token_bucket_cfg *cfg,
52290001Sglebius    ev_uint32_t current_tick,
53290001Sglebius    int reinitialize)
54290001Sglebius{
55290001Sglebius	if (reinitialize) {
56290001Sglebius		/* on reinitialization, we only clip downwards, since we've
57290001Sglebius		   already used who-knows-how-much bandwidth this tick.  We
58290001Sglebius		   leave "last_updated" as it is; the next update will add the
59290001Sglebius		   appropriate amount of bandwidth to the bucket.
60290001Sglebius		*/
61290001Sglebius		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62290001Sglebius			bucket->read_limit = cfg->read_maximum;
63290001Sglebius		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64290001Sglebius			bucket->write_limit = cfg->write_maximum;
65290001Sglebius	} else {
66290001Sglebius		bucket->read_limit = cfg->read_rate;
67290001Sglebius		bucket->write_limit = cfg->write_rate;
68290001Sglebius		bucket->last_updated = current_tick;
69290001Sglebius	}
70290001Sglebius	return 0;
71290001Sglebius}
72290001Sglebius
73290001Sglebiusint
74290001Sglebiusev_token_bucket_update_(struct ev_token_bucket *bucket,
75290001Sglebius    const struct ev_token_bucket_cfg *cfg,
76290001Sglebius    ev_uint32_t current_tick)
77290001Sglebius{
78290001Sglebius	/* It's okay if the tick number overflows, since we'll just
79290001Sglebius	 * wrap around when we do the unsigned substraction. */
80290001Sglebius	unsigned n_ticks = current_tick - bucket->last_updated;
81290001Sglebius
82290001Sglebius	/* Make sure some ticks actually happened, and that time didn't
83290001Sglebius	 * roll back. */
84290001Sglebius	if (n_ticks == 0 || n_ticks > INT_MAX)
85290001Sglebius		return 0;
86290001Sglebius
87290001Sglebius	/* Naively, we would say
88290001Sglebius		bucket->limit += n_ticks * cfg->rate;
89290001Sglebius
90290001Sglebius		if (bucket->limit > cfg->maximum)
91290001Sglebius			bucket->limit = cfg->maximum;
92290001Sglebius
93290001Sglebius	   But we're worried about overflow, so we do it like this:
94290001Sglebius	*/
95290001Sglebius
96290001Sglebius	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97290001Sglebius		bucket->read_limit = cfg->read_maximum;
98290001Sglebius	else
99290001Sglebius		bucket->read_limit += n_ticks * cfg->read_rate;
100290001Sglebius
101290001Sglebius
102290001Sglebius	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103290001Sglebius		bucket->write_limit = cfg->write_maximum;
104290001Sglebius	else
105290001Sglebius		bucket->write_limit += n_ticks * cfg->write_rate;
106290001Sglebius
107290001Sglebius
108290001Sglebius	bucket->last_updated = current_tick;
109290001Sglebius
110290001Sglebius	return 1;
111290001Sglebius}
112290001Sglebius
113290001Sglebiusstatic inline void
114290001Sglebiusbufferevent_update_buckets(struct bufferevent_private *bev)
115290001Sglebius{
116290001Sglebius	/* Must hold lock on bev. */
117290001Sglebius	struct timeval now;
118290001Sglebius	unsigned tick;
119290001Sglebius	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120290001Sglebius	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121290001Sglebius	if (tick != bev->rate_limiting->limit.last_updated)
122290001Sglebius		ev_token_bucket_update_(&bev->rate_limiting->limit,
123290001Sglebius		    bev->rate_limiting->cfg, tick);
124290001Sglebius}
125290001Sglebius
126290001Sglebiusev_uint32_t
127290001Sglebiusev_token_bucket_get_tick_(const struct timeval *tv,
128290001Sglebius    const struct ev_token_bucket_cfg *cfg)
129290001Sglebius{
130290001Sglebius	/* This computation uses two multiplies and a divide.  We could do
131290001Sglebius	 * fewer if we knew that the tick length was an integer number of
132290001Sglebius	 * seconds, or if we knew it divided evenly into a second.  We should
133290001Sglebius	 * investigate that more.
134290001Sglebius	 */
135290001Sglebius
136290001Sglebius	/* We cast to an ev_uint64_t first, since we don't want to overflow
137290001Sglebius	 * before we do the final divide. */
138290001Sglebius	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139290001Sglebius	return (unsigned)(msec / cfg->msec_per_tick);
140290001Sglebius}
141290001Sglebius
142290001Sglebiusstruct ev_token_bucket_cfg *
143290001Sglebiusev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144290001Sglebius    size_t write_rate, size_t write_burst,
145290001Sglebius    const struct timeval *tick_len)
146290001Sglebius{
147290001Sglebius	struct ev_token_bucket_cfg *r;
148290001Sglebius	struct timeval g;
149290001Sglebius	if (! tick_len) {
150290001Sglebius		g.tv_sec = 1;
151290001Sglebius		g.tv_usec = 0;
152290001Sglebius		tick_len = &g;
153290001Sglebius	}
154290001Sglebius	if (read_rate > read_burst || write_rate > write_burst ||
155290001Sglebius	    read_rate < 1 || write_rate < 1)
156290001Sglebius		return NULL;
157290001Sglebius	if (read_rate > EV_RATE_LIMIT_MAX ||
158290001Sglebius	    write_rate > EV_RATE_LIMIT_MAX ||
159290001Sglebius	    read_burst > EV_RATE_LIMIT_MAX ||
160290001Sglebius	    write_burst > EV_RATE_LIMIT_MAX)
161290001Sglebius		return NULL;
162290001Sglebius	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163290001Sglebius	if (!r)
164290001Sglebius		return NULL;
165290001Sglebius	r->read_rate = read_rate;
166290001Sglebius	r->write_rate = write_rate;
167290001Sglebius	r->read_maximum = read_burst;
168290001Sglebius	r->write_maximum = write_burst;
169290001Sglebius	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170290001Sglebius	r->msec_per_tick = (tick_len->tv_sec * 1000) +
171290001Sglebius	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172290001Sglebius	return r;
173290001Sglebius}
174290001Sglebius
175290001Sglebiusvoid
176290001Sglebiusev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177290001Sglebius{
178290001Sglebius	mm_free(cfg);
179290001Sglebius}
180290001Sglebius
181290001Sglebius/* Default values for max_single_read & max_single_write variables. */
182290001Sglebius#define MAX_SINGLE_READ_DEFAULT 16384
183290001Sglebius#define MAX_SINGLE_WRITE_DEFAULT 16384
184290001Sglebius
185290001Sglebius#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186290001Sglebius#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187290001Sglebius
188290001Sglebiusstatic int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189290001Sglebiusstatic int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190290001Sglebiusstatic void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191290001Sglebiusstatic void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192290001Sglebius
193290001Sglebius/** Helper: figure out the maximum amount we should write if is_write, or
194290001Sglebius    the maximum amount we should read if is_read.  Return that maximum, or
195290001Sglebius    0 if our bucket is wholly exhausted.
196290001Sglebius */
197290001Sglebiusstatic inline ev_ssize_t
198290001Sglebiusbufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199290001Sglebius{
200290001Sglebius	/* needs lock on bev. */
201290001Sglebius	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202290001Sglebius
203290001Sglebius#define LIM(x)						\
204290001Sglebius	(is_write ? (x).write_limit : (x).read_limit)
205290001Sglebius
206290001Sglebius#define GROUP_SUSPENDED(g)			\
207290001Sglebius	(is_write ? (g)->write_suspended : (g)->read_suspended)
208290001Sglebius
209290001Sglebius	/* Sets max_so_far to MIN(x, max_so_far) */
210290001Sglebius#define CLAMPTO(x)				\
211290001Sglebius	do {					\
212290001Sglebius		if (max_so_far > (x))		\
213290001Sglebius			max_so_far = (x);	\
214290001Sglebius	} while (0);
215290001Sglebius
216290001Sglebius	if (!bev->rate_limiting)
217290001Sglebius		return max_so_far;
218290001Sglebius
219290001Sglebius	/* If rate-limiting is enabled at all, update the appropriate
220290001Sglebius	   bucket, and take the smaller of our rate limit and the group
221290001Sglebius	   rate limit.
222290001Sglebius	 */
223290001Sglebius
224290001Sglebius	if (bev->rate_limiting->cfg) {
225290001Sglebius		bufferevent_update_buckets(bev);
226290001Sglebius		max_so_far = LIM(bev->rate_limiting->limit);
227290001Sglebius	}
228290001Sglebius	if (bev->rate_limiting->group) {
229290001Sglebius		struct bufferevent_rate_limit_group *g =
230290001Sglebius		    bev->rate_limiting->group;
231290001Sglebius		ev_ssize_t share;
232290001Sglebius		LOCK_GROUP(g);
233290001Sglebius		if (GROUP_SUSPENDED(g)) {
234290001Sglebius			/* We can get here if we failed to lock this
235290001Sglebius			 * particular bufferevent while suspending the whole
236290001Sglebius			 * group. */
237290001Sglebius			if (is_write)
238290001Sglebius				bufferevent_suspend_write_(&bev->bev,
239290001Sglebius				    BEV_SUSPEND_BW_GROUP);
240290001Sglebius			else
241290001Sglebius				bufferevent_suspend_read_(&bev->bev,
242290001Sglebius				    BEV_SUSPEND_BW_GROUP);
243290001Sglebius			share = 0;
244290001Sglebius		} else {
245290001Sglebius			/* XXXX probably we should divide among the active
246290001Sglebius			 * members, not the total members. */
247290001Sglebius			share = LIM(g->rate_limit) / g->n_members;
248290001Sglebius			if (share < g->min_share)
249290001Sglebius				share = g->min_share;
250290001Sglebius		}
251290001Sglebius		UNLOCK_GROUP(g);
252290001Sglebius		CLAMPTO(share);
253290001Sglebius	}
254290001Sglebius
255290001Sglebius	if (max_so_far < 0)
256290001Sglebius		max_so_far = 0;
257290001Sglebius	return max_so_far;
258290001Sglebius}
259290001Sglebius
260290001Sglebiusev_ssize_t
261290001Sglebiusbufferevent_get_read_max_(struct bufferevent_private *bev)
262290001Sglebius{
263290001Sglebius	return bufferevent_get_rlim_max_(bev, 0);
264290001Sglebius}
265290001Sglebius
266290001Sglebiusev_ssize_t
267290001Sglebiusbufferevent_get_write_max_(struct bufferevent_private *bev)
268290001Sglebius{
269290001Sglebius	return bufferevent_get_rlim_max_(bev, 1);
270290001Sglebius}
271290001Sglebius
272290001Sglebiusint
273290001Sglebiusbufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274290001Sglebius{
275290001Sglebius	/* XXXXX Make sure all users of this function check its return value */
276290001Sglebius	int r = 0;
277290001Sglebius	/* need to hold lock on bev */
278290001Sglebius	if (!bev->rate_limiting)
279290001Sglebius		return 0;
280290001Sglebius
281290001Sglebius	if (bev->rate_limiting->cfg) {
282290001Sglebius		bev->rate_limiting->limit.read_limit -= bytes;
283290001Sglebius		if (bev->rate_limiting->limit.read_limit <= 0) {
284290001Sglebius			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285290001Sglebius			if (event_add(&bev->rate_limiting->refill_bucket_event,
286290001Sglebius				&bev->rate_limiting->cfg->tick_timeout) < 0)
287290001Sglebius				r = -1;
288290001Sglebius		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
289290001Sglebius			if (!(bev->write_suspended & BEV_SUSPEND_BW))
290290001Sglebius				event_del(&bev->rate_limiting->refill_bucket_event);
291290001Sglebius			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292290001Sglebius		}
293290001Sglebius	}
294290001Sglebius
295290001Sglebius	if (bev->rate_limiting->group) {
296290001Sglebius		LOCK_GROUP(bev->rate_limiting->group);
297290001Sglebius		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298290001Sglebius		bev->rate_limiting->group->total_read += bytes;
299290001Sglebius		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300290001Sglebius			bev_group_suspend_reading_(bev->rate_limiting->group);
301290001Sglebius		} else if (bev->rate_limiting->group->read_suspended) {
302290001Sglebius			bev_group_unsuspend_reading_(bev->rate_limiting->group);
303290001Sglebius		}
304290001Sglebius		UNLOCK_GROUP(bev->rate_limiting->group);
305290001Sglebius	}
306290001Sglebius
307290001Sglebius	return r;
308290001Sglebius}
309290001Sglebius
310290001Sglebiusint
311290001Sglebiusbufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312290001Sglebius{
313290001Sglebius	/* XXXXX Make sure all users of this function check its return value */
314290001Sglebius	int r = 0;
315290001Sglebius	/* need to hold lock */
316290001Sglebius	if (!bev->rate_limiting)
317290001Sglebius		return 0;
318290001Sglebius
319290001Sglebius	if (bev->rate_limiting->cfg) {
320290001Sglebius		bev->rate_limiting->limit.write_limit -= bytes;
321290001Sglebius		if (bev->rate_limiting->limit.write_limit <= 0) {
322290001Sglebius			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323290001Sglebius			if (event_add(&bev->rate_limiting->refill_bucket_event,
324290001Sglebius				&bev->rate_limiting->cfg->tick_timeout) < 0)
325290001Sglebius				r = -1;
326290001Sglebius		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
327290001Sglebius			if (!(bev->read_suspended & BEV_SUSPEND_BW))
328290001Sglebius				event_del(&bev->rate_limiting->refill_bucket_event);
329290001Sglebius			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330290001Sglebius		}
331290001Sglebius	}
332290001Sglebius
333290001Sglebius	if (bev->rate_limiting->group) {
334290001Sglebius		LOCK_GROUP(bev->rate_limiting->group);
335290001Sglebius		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336290001Sglebius		bev->rate_limiting->group->total_written += bytes;
337290001Sglebius		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338290001Sglebius			bev_group_suspend_writing_(bev->rate_limiting->group);
339290001Sglebius		} else if (bev->rate_limiting->group->write_suspended) {
340290001Sglebius			bev_group_unsuspend_writing_(bev->rate_limiting->group);
341290001Sglebius		}
342290001Sglebius		UNLOCK_GROUP(bev->rate_limiting->group);
343290001Sglebius	}
344290001Sglebius
345290001Sglebius	return r;
346290001Sglebius}
347290001Sglebius
348290001Sglebius/** Stop reading on every bufferevent in <b>g</b> */
349290001Sglebiusstatic int
350290001Sglebiusbev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351290001Sglebius{
352290001Sglebius	/* Needs group lock */
353290001Sglebius	struct bufferevent_private *bev;
354290001Sglebius	g->read_suspended = 1;
355290001Sglebius	g->pending_unsuspend_read = 0;
356290001Sglebius
357290001Sglebius	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358290001Sglebius	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
359290001Sglebius	   the bufferevent locks.  If we are unable to lock any individual
360290001Sglebius	   bufferevent, it will find out later when it looks at its limit
361290001Sglebius	   and sees that its group is suspended.)
362290001Sglebius	*/
363290001Sglebius	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364290001Sglebius		if (EVLOCK_TRY_LOCK_(bev->lock)) {
365290001Sglebius			bufferevent_suspend_read_(&bev->bev,
366290001Sglebius			    BEV_SUSPEND_BW_GROUP);
367290001Sglebius			EVLOCK_UNLOCK(bev->lock, 0);
368290001Sglebius		}
369290001Sglebius	}
370290001Sglebius	return 0;
371290001Sglebius}
372290001Sglebius
373290001Sglebius/** Stop writing on every bufferevent in <b>g</b> */
374290001Sglebiusstatic int
375290001Sglebiusbev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376290001Sglebius{
377290001Sglebius	/* Needs group lock */
378290001Sglebius	struct bufferevent_private *bev;
379290001Sglebius	g->write_suspended = 1;
380290001Sglebius	g->pending_unsuspend_write = 0;
381290001Sglebius	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382290001Sglebius		if (EVLOCK_TRY_LOCK_(bev->lock)) {
383290001Sglebius			bufferevent_suspend_write_(&bev->bev,
384290001Sglebius			    BEV_SUSPEND_BW_GROUP);
385290001Sglebius			EVLOCK_UNLOCK(bev->lock, 0);
386290001Sglebius		}
387290001Sglebius	}
388290001Sglebius	return 0;
389290001Sglebius}
390290001Sglebius
391290001Sglebius/** Timer callback invoked on a single bufferevent with one or more exhausted
392290001Sglebius    buckets when they are ready to refill. */
393290001Sglebiusstatic void
394290001Sglebiusbev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395290001Sglebius{
396290001Sglebius	unsigned tick;
397290001Sglebius	struct timeval now;
398290001Sglebius	struct bufferevent_private *bev = arg;
399290001Sglebius	int again = 0;
400290001Sglebius	BEV_LOCK(&bev->bev);
401290001Sglebius	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402290001Sglebius		BEV_UNLOCK(&bev->bev);
403290001Sglebius		return;
404290001Sglebius	}
405290001Sglebius
406290001Sglebius	/* First, update the bucket */
407290001Sglebius	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408290001Sglebius	tick = ev_token_bucket_get_tick_(&now,
409290001Sglebius	    bev->rate_limiting->cfg);
410290001Sglebius	ev_token_bucket_update_(&bev->rate_limiting->limit,
411290001Sglebius	    bev->rate_limiting->cfg,
412290001Sglebius	    tick);
413290001Sglebius
414290001Sglebius	/* Now unsuspend any read/write operations as appropriate. */
415290001Sglebius	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416290001Sglebius		if (bev->rate_limiting->limit.read_limit > 0)
417290001Sglebius			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418290001Sglebius		else
419290001Sglebius			again = 1;
420290001Sglebius	}
421290001Sglebius	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422290001Sglebius		if (bev->rate_limiting->limit.write_limit > 0)
423290001Sglebius			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424290001Sglebius		else
425290001Sglebius			again = 1;
426290001Sglebius	}
427290001Sglebius	if (again) {
428290001Sglebius		/* One or more of the buckets may need another refill if they
429290001Sglebius		   started negative.
430290001Sglebius
431290001Sglebius		   XXXX if we need to be quiet for more ticks, we should
432290001Sglebius		   maybe figure out what timeout we really want.
433290001Sglebius		*/
434290001Sglebius		/* XXXX Handle event_add failure somehow */
435290001Sglebius		event_add(&bev->rate_limiting->refill_bucket_event,
436290001Sglebius		    &bev->rate_limiting->cfg->tick_timeout);
437290001Sglebius	}
438290001Sglebius	BEV_UNLOCK(&bev->bev);
439290001Sglebius}
440290001Sglebius
441290001Sglebius/** Helper: grab a random element from a bufferevent group.
442290001Sglebius *
443290001Sglebius * Requires that we hold the lock on the group.
444290001Sglebius */
445290001Sglebiusstatic struct bufferevent_private *
446290001Sglebiusbev_group_random_element_(struct bufferevent_rate_limit_group *group)
447290001Sglebius{
448290001Sglebius	int which;
449290001Sglebius	struct bufferevent_private *bev;
450290001Sglebius
451290001Sglebius	/* requires group lock */
452290001Sglebius
453290001Sglebius	if (!group->n_members)
454290001Sglebius		return NULL;
455290001Sglebius
456290001Sglebius	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457290001Sglebius
458290001Sglebius	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459290001Sglebius
460290001Sglebius	bev = LIST_FIRST(&group->members);
461290001Sglebius	while (which--)
462290001Sglebius		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463290001Sglebius
464290001Sglebius	return bev;
465290001Sglebius}
466290001Sglebius
467290001Sglebius/** Iterate over the elements of a rate-limiting group 'g' with a random
468290001Sglebius    starting point, assigning each to the variable 'bev', and executing the
469290001Sglebius    block 'block'.
470290001Sglebius
471290001Sglebius    We do this in a half-baked effort to get fairness among group members.
472290001Sglebius    XXX Round-robin or some kind of priority queue would be even more fair.
473290001Sglebius */
474290001Sglebius#define FOREACH_RANDOM_ORDER(block)			\
475290001Sglebius	do {						\
476290001Sglebius		first = bev_group_random_element_(g);	\
477290001Sglebius		for (bev = first; bev != LIST_END(&g->members); \
478290001Sglebius		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479290001Sglebius			block ;					 \
480290001Sglebius		}						 \
481290001Sglebius		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482290001Sglebius		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483290001Sglebius			block ;						\
484290001Sglebius		}							\
485290001Sglebius	} while (0)
486290001Sglebius
487290001Sglebiusstatic void
488290001Sglebiusbev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489290001Sglebius{
490290001Sglebius	int again = 0;
491290001Sglebius	struct bufferevent_private *bev, *first;
492290001Sglebius
493290001Sglebius	g->read_suspended = 0;
494290001Sglebius	FOREACH_RANDOM_ORDER({
495290001Sglebius		if (EVLOCK_TRY_LOCK_(bev->lock)) {
496290001Sglebius			bufferevent_unsuspend_read_(&bev->bev,
497290001Sglebius			    BEV_SUSPEND_BW_GROUP);
498290001Sglebius			EVLOCK_UNLOCK(bev->lock, 0);
499290001Sglebius		} else {
500290001Sglebius			again = 1;
501290001Sglebius		}
502290001Sglebius	});
503290001Sglebius	g->pending_unsuspend_read = again;
504290001Sglebius}
505290001Sglebius
506290001Sglebiusstatic void
507290001Sglebiusbev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508290001Sglebius{
509290001Sglebius	int again = 0;
510290001Sglebius	struct bufferevent_private *bev, *first;
511290001Sglebius	g->write_suspended = 0;
512290001Sglebius
513290001Sglebius	FOREACH_RANDOM_ORDER({
514290001Sglebius		if (EVLOCK_TRY_LOCK_(bev->lock)) {
515290001Sglebius			bufferevent_unsuspend_write_(&bev->bev,
516290001Sglebius			    BEV_SUSPEND_BW_GROUP);
517290001Sglebius			EVLOCK_UNLOCK(bev->lock, 0);
518290001Sglebius		} else {
519290001Sglebius			again = 1;
520290001Sglebius		}
521290001Sglebius	});
522290001Sglebius	g->pending_unsuspend_write = again;
523290001Sglebius}
524290001Sglebius
525290001Sglebius/** Callback invoked every tick to add more elements to the group bucket
526290001Sglebius    and unsuspend group members as needed.
527290001Sglebius */
528290001Sglebiusstatic void
529290001Sglebiusbev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530290001Sglebius{
531290001Sglebius	struct bufferevent_rate_limit_group *g = arg;
532290001Sglebius	unsigned tick;
533290001Sglebius	struct timeval now;
534290001Sglebius
535290001Sglebius	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536290001Sglebius
537290001Sglebius	LOCK_GROUP(g);
538290001Sglebius
539290001Sglebius	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540290001Sglebius	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541290001Sglebius
542290001Sglebius	if (g->pending_unsuspend_read ||
543290001Sglebius	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544290001Sglebius		bev_group_unsuspend_reading_(g);
545290001Sglebius	}
546290001Sglebius	if (g->pending_unsuspend_write ||
547290001Sglebius	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548290001Sglebius		bev_group_unsuspend_writing_(g);
549290001Sglebius	}
550290001Sglebius
551290001Sglebius	/* XXXX Rather than waiting to the next tick to unsuspend stuff
552290001Sglebius	 * with pending_unsuspend_write/read, we should do it on the
553290001Sglebius	 * next iteration of the mainloop.
554290001Sglebius	 */
555290001Sglebius
556290001Sglebius	UNLOCK_GROUP(g);
557290001Sglebius}
558290001Sglebius
559290001Sglebiusint
560290001Sglebiusbufferevent_set_rate_limit(struct bufferevent *bev,
561290001Sglebius    struct ev_token_bucket_cfg *cfg)
562290001Sglebius{
563290001Sglebius	struct bufferevent_private *bevp =
564290001Sglebius	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
565290001Sglebius	int r = -1;
566290001Sglebius	struct bufferevent_rate_limit *rlim;
567290001Sglebius	struct timeval now;
568290001Sglebius	ev_uint32_t tick;
569290001Sglebius	int reinit = 0, suspended = 0;
570290001Sglebius	/* XXX reference-count cfg */
571290001Sglebius
572290001Sglebius	BEV_LOCK(bev);
573290001Sglebius
574290001Sglebius	if (cfg == NULL) {
575290001Sglebius		if (bevp->rate_limiting) {
576290001Sglebius			rlim = bevp->rate_limiting;
577290001Sglebius			rlim->cfg = NULL;
578290001Sglebius			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
579290001Sglebius			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
580290001Sglebius			if (event_initialized(&rlim->refill_bucket_event))
581290001Sglebius				event_del(&rlim->refill_bucket_event);
582290001Sglebius		}
583290001Sglebius		r = 0;
584290001Sglebius		goto done;
585290001Sglebius	}
586290001Sglebius
587290001Sglebius	event_base_gettimeofday_cached(bev->ev_base, &now);
588290001Sglebius	tick = ev_token_bucket_get_tick_(&now, cfg);
589290001Sglebius
590290001Sglebius	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
591290001Sglebius		/* no-op */
592290001Sglebius		r = 0;
593290001Sglebius		goto done;
594290001Sglebius	}
595290001Sglebius	if (bevp->rate_limiting == NULL) {
596290001Sglebius		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
597290001Sglebius		if (!rlim)
598290001Sglebius			goto done;
599290001Sglebius		bevp->rate_limiting = rlim;
600290001Sglebius	} else {
601290001Sglebius		rlim = bevp->rate_limiting;
602290001Sglebius	}
603290001Sglebius	reinit = rlim->cfg != NULL;
604290001Sglebius
605290001Sglebius	rlim->cfg = cfg;
606290001Sglebius	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
607290001Sglebius
608290001Sglebius	if (reinit) {
609290001Sglebius		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
610290001Sglebius		event_del(&rlim->refill_bucket_event);
611290001Sglebius	}
612290001Sglebius	event_assign(&rlim->refill_bucket_event, bev->ev_base,
613290001Sglebius	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
614290001Sglebius
615290001Sglebius	if (rlim->limit.read_limit > 0) {
616290001Sglebius		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
617290001Sglebius	} else {
618290001Sglebius		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
619290001Sglebius		suspended=1;
620290001Sglebius	}
621290001Sglebius	if (rlim->limit.write_limit > 0) {
622290001Sglebius		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
623290001Sglebius	} else {
624290001Sglebius		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
625290001Sglebius		suspended = 1;
626290001Sglebius	}
627290001Sglebius
628290001Sglebius	if (suspended)
629290001Sglebius		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
630290001Sglebius
631290001Sglebius	r = 0;
632290001Sglebius
633290001Sglebiusdone:
634290001Sglebius	BEV_UNLOCK(bev);
635290001Sglebius	return r;
636290001Sglebius}
637290001Sglebius
638290001Sglebiusstruct bufferevent_rate_limit_group *
639290001Sglebiusbufferevent_rate_limit_group_new(struct event_base *base,
640290001Sglebius    const struct ev_token_bucket_cfg *cfg)
641290001Sglebius{
642290001Sglebius	struct bufferevent_rate_limit_group *g;
643290001Sglebius	struct timeval now;
644290001Sglebius	ev_uint32_t tick;
645290001Sglebius
646290001Sglebius	event_base_gettimeofday_cached(base, &now);
647290001Sglebius	tick = ev_token_bucket_get_tick_(&now, cfg);
648290001Sglebius
649290001Sglebius	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
650290001Sglebius	if (!g)
651290001Sglebius		return NULL;
652290001Sglebius	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
653290001Sglebius	LIST_INIT(&g->members);
654290001Sglebius
655290001Sglebius	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
656290001Sglebius
657290001Sglebius	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
658290001Sglebius	    bev_group_refill_callback_, g);
659290001Sglebius	/*XXXX handle event_add failure */
660290001Sglebius	event_add(&g->master_refill_event, &cfg->tick_timeout);
661290001Sglebius
662290001Sglebius	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
663290001Sglebius
664290001Sglebius	bufferevent_rate_limit_group_set_min_share(g, 64);
665290001Sglebius
666290001Sglebius	evutil_weakrand_seed_(&g->weakrand_seed,
667290001Sglebius	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
668290001Sglebius
669290001Sglebius	return g;
670290001Sglebius}
671290001Sglebius
672290001Sglebiusint
673290001Sglebiusbufferevent_rate_limit_group_set_cfg(
674290001Sglebius	struct bufferevent_rate_limit_group *g,
675290001Sglebius	const struct ev_token_bucket_cfg *cfg)
676290001Sglebius{
677290001Sglebius	int same_tick;
678290001Sglebius	if (!g || !cfg)
679290001Sglebius		return -1;
680290001Sglebius
681290001Sglebius	LOCK_GROUP(g);
682290001Sglebius	same_tick = evutil_timercmp(
683290001Sglebius		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
684290001Sglebius	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
685290001Sglebius
686290001Sglebius	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
687290001Sglebius		g->rate_limit.read_limit = cfg->read_maximum;
688290001Sglebius	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
689290001Sglebius		g->rate_limit.write_limit = cfg->write_maximum;
690290001Sglebius
691290001Sglebius	if (!same_tick) {
692290001Sglebius		/* This can cause a hiccup in the schedule */
693290001Sglebius		event_add(&g->master_refill_event, &cfg->tick_timeout);
694290001Sglebius	}
695290001Sglebius
696290001Sglebius	/* The new limits might force us to adjust min_share differently. */
697290001Sglebius	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
698290001Sglebius
699290001Sglebius	UNLOCK_GROUP(g);
700290001Sglebius	return 0;
701290001Sglebius}
702290001Sglebius
703290001Sglebiusint
704290001Sglebiusbufferevent_rate_limit_group_set_min_share(
705290001Sglebius	struct bufferevent_rate_limit_group *g,
706290001Sglebius	size_t share)
707290001Sglebius{
708290001Sglebius	if (share > EV_SSIZE_MAX)
709290001Sglebius		return -1;
710290001Sglebius
711290001Sglebius	g->configured_min_share = share;
712290001Sglebius
713290001Sglebius	/* Can't set share to less than the one-tick maximum.  IOW, at steady
714290001Sglebius	 * state, at least one connection can go per tick. */
715290001Sglebius	if (share > g->rate_limit_cfg.read_rate)
716290001Sglebius		share = g->rate_limit_cfg.read_rate;
717290001Sglebius	if (share > g->rate_limit_cfg.write_rate)
718290001Sglebius		share = g->rate_limit_cfg.write_rate;
719290001Sglebius
720290001Sglebius	g->min_share = share;
721290001Sglebius	return 0;
722290001Sglebius}
723290001Sglebius
724290001Sglebiusvoid
725290001Sglebiusbufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
726290001Sglebius{
727290001Sglebius	LOCK_GROUP(g);
728290001Sglebius	EVUTIL_ASSERT(0 == g->n_members);
729290001Sglebius	event_del(&g->master_refill_event);
730290001Sglebius	UNLOCK_GROUP(g);
731290001Sglebius	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
732290001Sglebius	mm_free(g);
733290001Sglebius}
734290001Sglebius
735290001Sglebiusint
736290001Sglebiusbufferevent_add_to_rate_limit_group(struct bufferevent *bev,
737290001Sglebius    struct bufferevent_rate_limit_group *g)
738290001Sglebius{
739290001Sglebius	int wsuspend, rsuspend;
740290001Sglebius	struct bufferevent_private *bevp =
741290001Sglebius	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
742290001Sglebius	BEV_LOCK(bev);
743290001Sglebius
744290001Sglebius	if (!bevp->rate_limiting) {
745290001Sglebius		struct bufferevent_rate_limit *rlim;
746290001Sglebius		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
747290001Sglebius		if (!rlim) {
748290001Sglebius			BEV_UNLOCK(bev);
749290001Sglebius			return -1;
750290001Sglebius		}
751290001Sglebius		event_assign(&rlim->refill_bucket_event, bev->ev_base,
752290001Sglebius		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
753290001Sglebius		bevp->rate_limiting = rlim;
754290001Sglebius	}
755290001Sglebius
756290001Sglebius	if (bevp->rate_limiting->group == g) {
757290001Sglebius		BEV_UNLOCK(bev);
758290001Sglebius		return 0;
759290001Sglebius	}
760290001Sglebius	if (bevp->rate_limiting->group)
761290001Sglebius		bufferevent_remove_from_rate_limit_group(bev);
762290001Sglebius
763290001Sglebius	LOCK_GROUP(g);
764290001Sglebius	bevp->rate_limiting->group = g;
765290001Sglebius	++g->n_members;
766290001Sglebius	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
767290001Sglebius
768290001Sglebius	rsuspend = g->read_suspended;
769290001Sglebius	wsuspend = g->write_suspended;
770290001Sglebius
771290001Sglebius	UNLOCK_GROUP(g);
772290001Sglebius
773290001Sglebius	if (rsuspend)
774290001Sglebius		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
775290001Sglebius	if (wsuspend)
776290001Sglebius		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
777290001Sglebius
778290001Sglebius	BEV_UNLOCK(bev);
779290001Sglebius	return 0;
780290001Sglebius}
781290001Sglebius
782290001Sglebiusint
783290001Sglebiusbufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
784290001Sglebius{
785290001Sglebius	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
786290001Sglebius}
787290001Sglebius
788290001Sglebiusint
789290001Sglebiusbufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790290001Sglebius    int unsuspend)
791290001Sglebius{
792290001Sglebius	struct bufferevent_private *bevp =
793290001Sglebius	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
794290001Sglebius	BEV_LOCK(bev);
795290001Sglebius	if (bevp->rate_limiting && bevp->rate_limiting->group) {
796290001Sglebius		struct bufferevent_rate_limit_group *g =
797290001Sglebius		    bevp->rate_limiting->group;
798290001Sglebius		LOCK_GROUP(g);
799290001Sglebius		bevp->rate_limiting->group = NULL;
800290001Sglebius		--g->n_members;
801290001Sglebius		LIST_REMOVE(bevp, rate_limiting->next_in_group);
802290001Sglebius		UNLOCK_GROUP(g);
803290001Sglebius	}
804290001Sglebius	if (unsuspend) {
805290001Sglebius		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
806290001Sglebius		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
807290001Sglebius	}
808290001Sglebius	BEV_UNLOCK(bev);
809290001Sglebius	return 0;
810290001Sglebius}
811290001Sglebius
812290001Sglebius/* ===
813290001Sglebius * API functions to expose rate limits.
814290001Sglebius *
815290001Sglebius * Don't use these from inside Libevent; they're meant to be for use by
816290001Sglebius * the program.
817290001Sglebius * === */
818290001Sglebius
819290001Sglebius/* Mostly you don't want to use this function from inside libevent;
820290001Sglebius * bufferevent_get_read_max_() is more likely what you want*/
821290001Sglebiusev_ssize_t
822290001Sglebiusbufferevent_get_read_limit(struct bufferevent *bev)
823290001Sglebius{
824290001Sglebius	ev_ssize_t r;
825290001Sglebius	struct bufferevent_private *bevp;
826290001Sglebius	BEV_LOCK(bev);
827290001Sglebius	bevp = BEV_UPCAST(bev);
828290001Sglebius	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
829290001Sglebius		bufferevent_update_buckets(bevp);
830290001Sglebius		r = bevp->rate_limiting->limit.read_limit;
831290001Sglebius	} else {
832290001Sglebius		r = EV_SSIZE_MAX;
833290001Sglebius	}
834290001Sglebius	BEV_UNLOCK(bev);
835290001Sglebius	return r;
836290001Sglebius}
837290001Sglebius
838290001Sglebius/* Mostly you don't want to use this function from inside libevent;
839290001Sglebius * bufferevent_get_write_max_() is more likely what you want*/
840290001Sglebiusev_ssize_t
841290001Sglebiusbufferevent_get_write_limit(struct bufferevent *bev)
842290001Sglebius{
843290001Sglebius	ev_ssize_t r;
844290001Sglebius	struct bufferevent_private *bevp;
845290001Sglebius	BEV_LOCK(bev);
846290001Sglebius	bevp = BEV_UPCAST(bev);
847290001Sglebius	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
848290001Sglebius		bufferevent_update_buckets(bevp);
849290001Sglebius		r = bevp->rate_limiting->limit.write_limit;
850290001Sglebius	} else {
851290001Sglebius		r = EV_SSIZE_MAX;
852290001Sglebius	}
853290001Sglebius	BEV_UNLOCK(bev);
854290001Sglebius	return r;
855290001Sglebius}
856290001Sglebius
857290001Sglebiusint
858290001Sglebiusbufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
859290001Sglebius{
860290001Sglebius	struct bufferevent_private *bevp;
861290001Sglebius	BEV_LOCK(bev);
862290001Sglebius	bevp = BEV_UPCAST(bev);
863290001Sglebius	if (size == 0 || size > EV_SSIZE_MAX)
864290001Sglebius		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
865290001Sglebius	else
866290001Sglebius		bevp->max_single_read = size;
867290001Sglebius	BEV_UNLOCK(bev);
868290001Sglebius	return 0;
869290001Sglebius}
870290001Sglebius
871290001Sglebiusint
872290001Sglebiusbufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
873290001Sglebius{
874290001Sglebius	struct bufferevent_private *bevp;
875290001Sglebius	BEV_LOCK(bev);
876290001Sglebius	bevp = BEV_UPCAST(bev);
877290001Sglebius	if (size == 0 || size > EV_SSIZE_MAX)
878290001Sglebius		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
879290001Sglebius	else
880290001Sglebius		bevp->max_single_write = size;
881290001Sglebius	BEV_UNLOCK(bev);
882290001Sglebius	return 0;
883290001Sglebius}
884290001Sglebius
885290001Sglebiusev_ssize_t
886290001Sglebiusbufferevent_get_max_single_read(struct bufferevent *bev)
887290001Sglebius{
888290001Sglebius	ev_ssize_t r;
889290001Sglebius
890290001Sglebius	BEV_LOCK(bev);
891290001Sglebius	r = BEV_UPCAST(bev)->max_single_read;
892290001Sglebius	BEV_UNLOCK(bev);
893290001Sglebius	return r;
894290001Sglebius}
895290001Sglebius
896290001Sglebiusev_ssize_t
897290001Sglebiusbufferevent_get_max_single_write(struct bufferevent *bev)
898290001Sglebius{
899290001Sglebius	ev_ssize_t r;
900290001Sglebius
901290001Sglebius	BEV_LOCK(bev);
902290001Sglebius	r = BEV_UPCAST(bev)->max_single_write;
903290001Sglebius	BEV_UNLOCK(bev);
904290001Sglebius	return r;
905290001Sglebius}
906290001Sglebius
907290001Sglebiusev_ssize_t
908290001Sglebiusbufferevent_get_max_to_read(struct bufferevent *bev)
909290001Sglebius{
910290001Sglebius	ev_ssize_t r;
911290001Sglebius	BEV_LOCK(bev);
912290001Sglebius	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
913290001Sglebius	BEV_UNLOCK(bev);
914290001Sglebius	return r;
915290001Sglebius}
916290001Sglebius
917290001Sglebiusev_ssize_t
918290001Sglebiusbufferevent_get_max_to_write(struct bufferevent *bev)
919290001Sglebius{
920290001Sglebius	ev_ssize_t r;
921290001Sglebius	BEV_LOCK(bev);
922290001Sglebius	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
923290001Sglebius	BEV_UNLOCK(bev);
924290001Sglebius	return r;
925290001Sglebius}
926290001Sglebius
927290001Sglebiusconst struct ev_token_bucket_cfg *
928290001Sglebiusbufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
929290001Sglebius	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
930290001Sglebius	struct ev_token_bucket_cfg *cfg;
931290001Sglebius
932290001Sglebius	BEV_LOCK(bev);
933290001Sglebius
934290001Sglebius	if (bufev_private->rate_limiting) {
935290001Sglebius		cfg = bufev_private->rate_limiting->cfg;
936290001Sglebius	} else {
937290001Sglebius		cfg = NULL;
938290001Sglebius	}
939290001Sglebius
940290001Sglebius	BEV_UNLOCK(bev);
941290001Sglebius
942290001Sglebius	return cfg;
943290001Sglebius}
944290001Sglebius
945290001Sglebius/* Mostly you don't want to use this function from inside libevent;
946290001Sglebius * bufferevent_get_read_max_() is more likely what you want*/
947290001Sglebiusev_ssize_t
948290001Sglebiusbufferevent_rate_limit_group_get_read_limit(
949290001Sglebius	struct bufferevent_rate_limit_group *grp)
950290001Sglebius{
951290001Sglebius	ev_ssize_t r;
952290001Sglebius	LOCK_GROUP(grp);
953290001Sglebius	r = grp->rate_limit.read_limit;
954290001Sglebius	UNLOCK_GROUP(grp);
955290001Sglebius	return r;
956290001Sglebius}
957290001Sglebius
958290001Sglebius/* Mostly you don't want to use this function from inside libevent;
959290001Sglebius * bufferevent_get_write_max_() is more likely what you want. */
960290001Sglebiusev_ssize_t
961290001Sglebiusbufferevent_rate_limit_group_get_write_limit(
962290001Sglebius	struct bufferevent_rate_limit_group *grp)
963290001Sglebius{
964290001Sglebius	ev_ssize_t r;
965290001Sglebius	LOCK_GROUP(grp);
966290001Sglebius	r = grp->rate_limit.write_limit;
967290001Sglebius	UNLOCK_GROUP(grp);
968290001Sglebius	return r;
969290001Sglebius}
970290001Sglebius
971290001Sglebiusint
972290001Sglebiusbufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
973290001Sglebius{
974290001Sglebius	int r = 0;
975290001Sglebius	ev_ssize_t old_limit, new_limit;
976290001Sglebius	struct bufferevent_private *bevp;
977290001Sglebius	BEV_LOCK(bev);
978290001Sglebius	bevp = BEV_UPCAST(bev);
979290001Sglebius	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
980290001Sglebius	old_limit = bevp->rate_limiting->limit.read_limit;
981290001Sglebius
982290001Sglebius	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
983290001Sglebius	if (old_limit > 0 && new_limit <= 0) {
984290001Sglebius		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
985290001Sglebius		if (event_add(&bevp->rate_limiting->refill_bucket_event,
986290001Sglebius			&bevp->rate_limiting->cfg->tick_timeout) < 0)
987290001Sglebius			r = -1;
988290001Sglebius	} else if (old_limit <= 0 && new_limit > 0) {
989290001Sglebius		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
990290001Sglebius			event_del(&bevp->rate_limiting->refill_bucket_event);
991290001Sglebius		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
992290001Sglebius	}
993290001Sglebius
994290001Sglebius	BEV_UNLOCK(bev);
995290001Sglebius	return r;
996290001Sglebius}
997290001Sglebius
998290001Sglebiusint
999290001Sglebiusbufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1000290001Sglebius{
1001290001Sglebius	/* XXXX this is mostly copy-and-paste from
1002290001Sglebius	 * bufferevent_decrement_read_limit */
1003290001Sglebius	int r = 0;
1004290001Sglebius	ev_ssize_t old_limit, new_limit;
1005290001Sglebius	struct bufferevent_private *bevp;
1006290001Sglebius	BEV_LOCK(bev);
1007290001Sglebius	bevp = BEV_UPCAST(bev);
1008290001Sglebius	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1009290001Sglebius	old_limit = bevp->rate_limiting->limit.write_limit;
1010290001Sglebius
1011290001Sglebius	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1012290001Sglebius	if (old_limit > 0 && new_limit <= 0) {
1013290001Sglebius		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1014290001Sglebius		if (event_add(&bevp->rate_limiting->refill_bucket_event,
1015290001Sglebius			&bevp->rate_limiting->cfg->tick_timeout) < 0)
1016290001Sglebius			r = -1;
1017290001Sglebius	} else if (old_limit <= 0 && new_limit > 0) {
1018290001Sglebius		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1019290001Sglebius			event_del(&bevp->rate_limiting->refill_bucket_event);
1020290001Sglebius		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1021290001Sglebius	}
1022290001Sglebius
1023290001Sglebius	BEV_UNLOCK(bev);
1024290001Sglebius	return r;
1025290001Sglebius}
1026290001Sglebius
1027290001Sglebiusint
1028290001Sglebiusbufferevent_rate_limit_group_decrement_read(
1029290001Sglebius	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1030290001Sglebius{
1031290001Sglebius	int r = 0;
1032290001Sglebius	ev_ssize_t old_limit, new_limit;
1033290001Sglebius	LOCK_GROUP(grp);
1034290001Sglebius	old_limit = grp->rate_limit.read_limit;
1035290001Sglebius	new_limit = (grp->rate_limit.read_limit -= decr);
1036290001Sglebius
1037290001Sglebius	if (old_limit > 0 && new_limit <= 0) {
1038290001Sglebius		bev_group_suspend_reading_(grp);
1039290001Sglebius	} else if (old_limit <= 0 && new_limit > 0) {
1040290001Sglebius		bev_group_unsuspend_reading_(grp);
1041290001Sglebius	}
1042290001Sglebius
1043290001Sglebius	UNLOCK_GROUP(grp);
1044290001Sglebius	return r;
1045290001Sglebius}
1046290001Sglebius
1047290001Sglebiusint
1048290001Sglebiusbufferevent_rate_limit_group_decrement_write(
1049290001Sglebius	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1050290001Sglebius{
1051290001Sglebius	int r = 0;
1052290001Sglebius	ev_ssize_t old_limit, new_limit;
1053290001Sglebius	LOCK_GROUP(grp);
1054290001Sglebius	old_limit = grp->rate_limit.write_limit;
1055290001Sglebius	new_limit = (grp->rate_limit.write_limit -= decr);
1056290001Sglebius
1057290001Sglebius	if (old_limit > 0 && new_limit <= 0) {
1058290001Sglebius		bev_group_suspend_writing_(grp);
1059290001Sglebius	} else if (old_limit <= 0 && new_limit > 0) {
1060290001Sglebius		bev_group_unsuspend_writing_(grp);
1061290001Sglebius	}
1062290001Sglebius
1063290001Sglebius	UNLOCK_GROUP(grp);
1064290001Sglebius	return r;
1065290001Sglebius}
1066290001Sglebius
1067290001Sglebiusvoid
1068290001Sglebiusbufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1069290001Sglebius    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1070290001Sglebius{
1071290001Sglebius	EVUTIL_ASSERT(grp != NULL);
1072290001Sglebius	if (total_read_out)
1073290001Sglebius		*total_read_out = grp->total_read;
1074290001Sglebius	if (total_written_out)
1075290001Sglebius		*total_written_out = grp->total_written;
1076290001Sglebius}
1077290001Sglebius
1078290001Sglebiusvoid
1079290001Sglebiusbufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1080290001Sglebius{
1081290001Sglebius	grp->total_read = grp->total_written = 0;
1082290001Sglebius}
1083290001Sglebius
1084290001Sglebiusint
1085290001Sglebiusbufferevent_ratelim_init_(struct bufferevent_private *bev)
1086290001Sglebius{
1087290001Sglebius	bev->rate_limiting = NULL;
1088290001Sglebius	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1089290001Sglebius	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1090290001Sglebius
1091290001Sglebius	return 0;
1092290001Sglebius}
1093