1290001Sglebius/* 2290001Sglebius * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 3290001Sglebius * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 4290001Sglebius * All rights reserved. 5290001Sglebius * 6290001Sglebius * Redistribution and use in source and binary forms, with or without 7290001Sglebius * modification, are permitted provided that the following conditions 8290001Sglebius * are met: 9290001Sglebius * 1. Redistributions of source code must retain the above copyright 10290001Sglebius * notice, this list of conditions and the following disclaimer. 11290001Sglebius * 2. Redistributions in binary form must reproduce the above copyright 12290001Sglebius * notice, this list of conditions and the following disclaimer in the 13290001Sglebius * documentation and/or other materials provided with the distribution. 14290001Sglebius * 3. The name of the author may not be used to endorse or promote products 15290001Sglebius * derived from this software without specific prior written permission. 16290001Sglebius * 17290001Sglebius * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18290001Sglebius * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19290001Sglebius * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20290001Sglebius * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21290001Sglebius * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22290001Sglebius * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23290001Sglebius * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24290001Sglebius * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25290001Sglebius * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26290001Sglebius * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27290001Sglebius */ 28290001Sglebius#include "evconfig-private.h" 29290001Sglebius 30290001Sglebius#include <sys/types.h> 31290001Sglebius#include <limits.h> 32290001Sglebius#include <string.h> 33290001Sglebius#include <stdlib.h> 34290001Sglebius 35290001Sglebius#include "event2/event.h" 36290001Sglebius#include "event2/event_struct.h" 37290001Sglebius#include "event2/util.h" 38290001Sglebius#include "event2/bufferevent.h" 39290001Sglebius#include "event2/bufferevent_struct.h" 40290001Sglebius#include "event2/buffer.h" 41290001Sglebius 42290001Sglebius#include "ratelim-internal.h" 43290001Sglebius 44290001Sglebius#include "bufferevent-internal.h" 45290001Sglebius#include "mm-internal.h" 46290001Sglebius#include "util-internal.h" 47290001Sglebius#include "event-internal.h" 48290001Sglebius 49290001Sglebiusint 50290001Sglebiusev_token_bucket_init_(struct ev_token_bucket *bucket, 51290001Sglebius const struct ev_token_bucket_cfg *cfg, 52290001Sglebius ev_uint32_t current_tick, 53290001Sglebius int reinitialize) 54290001Sglebius{ 55290001Sglebius if (reinitialize) { 56290001Sglebius /* on reinitialization, we only clip downwards, since we've 57290001Sglebius already used who-knows-how-much bandwidth this tick. We 58290001Sglebius leave "last_updated" as it is; the next update will add the 59290001Sglebius appropriate amount of bandwidth to the bucket. 60290001Sglebius */ 61290001Sglebius if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) 62290001Sglebius bucket->read_limit = cfg->read_maximum; 63290001Sglebius if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) 64290001Sglebius bucket->write_limit = cfg->write_maximum; 65290001Sglebius } else { 66290001Sglebius bucket->read_limit = cfg->read_rate; 67290001Sglebius bucket->write_limit = cfg->write_rate; 68290001Sglebius bucket->last_updated = current_tick; 69290001Sglebius } 70290001Sglebius return 0; 71290001Sglebius} 72290001Sglebius 73290001Sglebiusint 74290001Sglebiusev_token_bucket_update_(struct ev_token_bucket *bucket, 75290001Sglebius const struct ev_token_bucket_cfg *cfg, 76290001Sglebius ev_uint32_t current_tick) 77290001Sglebius{ 78290001Sglebius /* It's okay if the tick number overflows, since we'll just 79290001Sglebius * wrap around when we do the unsigned substraction. */ 80290001Sglebius unsigned n_ticks = current_tick - bucket->last_updated; 81290001Sglebius 82290001Sglebius /* Make sure some ticks actually happened, and that time didn't 83290001Sglebius * roll back. */ 84290001Sglebius if (n_ticks == 0 || n_ticks > INT_MAX) 85290001Sglebius return 0; 86290001Sglebius 87290001Sglebius /* Naively, we would say 88290001Sglebius bucket->limit += n_ticks * cfg->rate; 89290001Sglebius 90290001Sglebius if (bucket->limit > cfg->maximum) 91290001Sglebius bucket->limit = cfg->maximum; 92290001Sglebius 93290001Sglebius But we're worried about overflow, so we do it like this: 94290001Sglebius */ 95290001Sglebius 96290001Sglebius if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) 97290001Sglebius bucket->read_limit = cfg->read_maximum; 98290001Sglebius else 99290001Sglebius bucket->read_limit += n_ticks * cfg->read_rate; 100290001Sglebius 101290001Sglebius 102290001Sglebius if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) 103290001Sglebius bucket->write_limit = cfg->write_maximum; 104290001Sglebius else 105290001Sglebius bucket->write_limit += n_ticks * cfg->write_rate; 106290001Sglebius 107290001Sglebius 108290001Sglebius bucket->last_updated = current_tick; 109290001Sglebius 110290001Sglebius return 1; 111290001Sglebius} 112290001Sglebius 113290001Sglebiusstatic inline void 114290001Sglebiusbufferevent_update_buckets(struct bufferevent_private *bev) 115290001Sglebius{ 116290001Sglebius /* Must hold lock on bev. */ 117290001Sglebius struct timeval now; 118290001Sglebius unsigned tick; 119290001Sglebius event_base_gettimeofday_cached(bev->bev.ev_base, &now); 120290001Sglebius tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg); 121290001Sglebius if (tick != bev->rate_limiting->limit.last_updated) 122290001Sglebius ev_token_bucket_update_(&bev->rate_limiting->limit, 123290001Sglebius bev->rate_limiting->cfg, tick); 124290001Sglebius} 125290001Sglebius 126290001Sglebiusev_uint32_t 127290001Sglebiusev_token_bucket_get_tick_(const struct timeval *tv, 128290001Sglebius const struct ev_token_bucket_cfg *cfg) 129290001Sglebius{ 130290001Sglebius /* This computation uses two multiplies and a divide. We could do 131290001Sglebius * fewer if we knew that the tick length was an integer number of 132290001Sglebius * seconds, or if we knew it divided evenly into a second. We should 133290001Sglebius * investigate that more. 134290001Sglebius */ 135290001Sglebius 136290001Sglebius /* We cast to an ev_uint64_t first, since we don't want to overflow 137290001Sglebius * before we do the final divide. */ 138290001Sglebius ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; 139290001Sglebius return (unsigned)(msec / cfg->msec_per_tick); 140290001Sglebius} 141290001Sglebius 142290001Sglebiusstruct ev_token_bucket_cfg * 143290001Sglebiusev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, 144290001Sglebius size_t write_rate, size_t write_burst, 145290001Sglebius const struct timeval *tick_len) 146290001Sglebius{ 147290001Sglebius struct ev_token_bucket_cfg *r; 148290001Sglebius struct timeval g; 149290001Sglebius if (! tick_len) { 150290001Sglebius g.tv_sec = 1; 151290001Sglebius g.tv_usec = 0; 152290001Sglebius tick_len = &g; 153290001Sglebius } 154290001Sglebius if (read_rate > read_burst || write_rate > write_burst || 155290001Sglebius read_rate < 1 || write_rate < 1) 156290001Sglebius return NULL; 157290001Sglebius if (read_rate > EV_RATE_LIMIT_MAX || 158290001Sglebius write_rate > EV_RATE_LIMIT_MAX || 159290001Sglebius read_burst > EV_RATE_LIMIT_MAX || 160290001Sglebius write_burst > EV_RATE_LIMIT_MAX) 161290001Sglebius return NULL; 162290001Sglebius r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); 163290001Sglebius if (!r) 164290001Sglebius return NULL; 165290001Sglebius r->read_rate = read_rate; 166290001Sglebius r->write_rate = write_rate; 167290001Sglebius r->read_maximum = read_burst; 168290001Sglebius r->write_maximum = write_burst; 169290001Sglebius memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); 170290001Sglebius r->msec_per_tick = (tick_len->tv_sec * 1000) + 171290001Sglebius (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; 172290001Sglebius return r; 173290001Sglebius} 174290001Sglebius 175290001Sglebiusvoid 176290001Sglebiusev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) 177290001Sglebius{ 178290001Sglebius mm_free(cfg); 179290001Sglebius} 180290001Sglebius 181290001Sglebius/* Default values for max_single_read & max_single_write variables. */ 182290001Sglebius#define MAX_SINGLE_READ_DEFAULT 16384 183290001Sglebius#define MAX_SINGLE_WRITE_DEFAULT 16384 184290001Sglebius 185290001Sglebius#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) 186290001Sglebius#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) 187290001Sglebius 188290001Sglebiusstatic int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g); 189290001Sglebiusstatic int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g); 190290001Sglebiusstatic void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g); 191290001Sglebiusstatic void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g); 192290001Sglebius 193290001Sglebius/** Helper: figure out the maximum amount we should write if is_write, or 194290001Sglebius the maximum amount we should read if is_read. Return that maximum, or 195290001Sglebius 0 if our bucket is wholly exhausted. 196290001Sglebius */ 197290001Sglebiusstatic inline ev_ssize_t 198290001Sglebiusbufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write) 199290001Sglebius{ 200290001Sglebius /* needs lock on bev. */ 201290001Sglebius ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read; 202290001Sglebius 203290001Sglebius#define LIM(x) \ 204290001Sglebius (is_write ? (x).write_limit : (x).read_limit) 205290001Sglebius 206290001Sglebius#define GROUP_SUSPENDED(g) \ 207290001Sglebius (is_write ? (g)->write_suspended : (g)->read_suspended) 208290001Sglebius 209290001Sglebius /* Sets max_so_far to MIN(x, max_so_far) */ 210290001Sglebius#define CLAMPTO(x) \ 211290001Sglebius do { \ 212290001Sglebius if (max_so_far > (x)) \ 213290001Sglebius max_so_far = (x); \ 214290001Sglebius } while (0); 215290001Sglebius 216290001Sglebius if (!bev->rate_limiting) 217290001Sglebius return max_so_far; 218290001Sglebius 219290001Sglebius /* If rate-limiting is enabled at all, update the appropriate 220290001Sglebius bucket, and take the smaller of our rate limit and the group 221290001Sglebius rate limit. 222290001Sglebius */ 223290001Sglebius 224290001Sglebius if (bev->rate_limiting->cfg) { 225290001Sglebius bufferevent_update_buckets(bev); 226290001Sglebius max_so_far = LIM(bev->rate_limiting->limit); 227290001Sglebius } 228290001Sglebius if (bev->rate_limiting->group) { 229290001Sglebius struct bufferevent_rate_limit_group *g = 230290001Sglebius bev->rate_limiting->group; 231290001Sglebius ev_ssize_t share; 232290001Sglebius LOCK_GROUP(g); 233290001Sglebius if (GROUP_SUSPENDED(g)) { 234290001Sglebius /* We can get here if we failed to lock this 235290001Sglebius * particular bufferevent while suspending the whole 236290001Sglebius * group. */ 237290001Sglebius if (is_write) 238290001Sglebius bufferevent_suspend_write_(&bev->bev, 239290001Sglebius BEV_SUSPEND_BW_GROUP); 240290001Sglebius else 241290001Sglebius bufferevent_suspend_read_(&bev->bev, 242290001Sglebius BEV_SUSPEND_BW_GROUP); 243290001Sglebius share = 0; 244290001Sglebius } else { 245290001Sglebius /* XXXX probably we should divide among the active 246290001Sglebius * members, not the total members. */ 247290001Sglebius share = LIM(g->rate_limit) / g->n_members; 248290001Sglebius if (share < g->min_share) 249290001Sglebius share = g->min_share; 250290001Sglebius } 251290001Sglebius UNLOCK_GROUP(g); 252290001Sglebius CLAMPTO(share); 253290001Sglebius } 254290001Sglebius 255290001Sglebius if (max_so_far < 0) 256290001Sglebius max_so_far = 0; 257290001Sglebius return max_so_far; 258290001Sglebius} 259290001Sglebius 260290001Sglebiusev_ssize_t 261290001Sglebiusbufferevent_get_read_max_(struct bufferevent_private *bev) 262290001Sglebius{ 263290001Sglebius return bufferevent_get_rlim_max_(bev, 0); 264290001Sglebius} 265290001Sglebius 266290001Sglebiusev_ssize_t 267290001Sglebiusbufferevent_get_write_max_(struct bufferevent_private *bev) 268290001Sglebius{ 269290001Sglebius return bufferevent_get_rlim_max_(bev, 1); 270290001Sglebius} 271290001Sglebius 272290001Sglebiusint 273290001Sglebiusbufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 274290001Sglebius{ 275290001Sglebius /* XXXXX Make sure all users of this function check its return value */ 276290001Sglebius int r = 0; 277290001Sglebius /* need to hold lock on bev */ 278290001Sglebius if (!bev->rate_limiting) 279290001Sglebius return 0; 280290001Sglebius 281290001Sglebius if (bev->rate_limiting->cfg) { 282290001Sglebius bev->rate_limiting->limit.read_limit -= bytes; 283290001Sglebius if (bev->rate_limiting->limit.read_limit <= 0) { 284290001Sglebius bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW); 285290001Sglebius if (event_add(&bev->rate_limiting->refill_bucket_event, 286290001Sglebius &bev->rate_limiting->cfg->tick_timeout) < 0) 287290001Sglebius r = -1; 288290001Sglebius } else if (bev->read_suspended & BEV_SUSPEND_BW) { 289290001Sglebius if (!(bev->write_suspended & BEV_SUSPEND_BW)) 290290001Sglebius event_del(&bev->rate_limiting->refill_bucket_event); 291290001Sglebius bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 292290001Sglebius } 293290001Sglebius } 294290001Sglebius 295290001Sglebius if (bev->rate_limiting->group) { 296290001Sglebius LOCK_GROUP(bev->rate_limiting->group); 297290001Sglebius bev->rate_limiting->group->rate_limit.read_limit -= bytes; 298290001Sglebius bev->rate_limiting->group->total_read += bytes; 299290001Sglebius if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { 300290001Sglebius bev_group_suspend_reading_(bev->rate_limiting->group); 301290001Sglebius } else if (bev->rate_limiting->group->read_suspended) { 302290001Sglebius bev_group_unsuspend_reading_(bev->rate_limiting->group); 303290001Sglebius } 304290001Sglebius UNLOCK_GROUP(bev->rate_limiting->group); 305290001Sglebius } 306290001Sglebius 307290001Sglebius return r; 308290001Sglebius} 309290001Sglebius 310290001Sglebiusint 311290001Sglebiusbufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 312290001Sglebius{ 313290001Sglebius /* XXXXX Make sure all users of this function check its return value */ 314290001Sglebius int r = 0; 315290001Sglebius /* need to hold lock */ 316290001Sglebius if (!bev->rate_limiting) 317290001Sglebius return 0; 318290001Sglebius 319290001Sglebius if (bev->rate_limiting->cfg) { 320290001Sglebius bev->rate_limiting->limit.write_limit -= bytes; 321290001Sglebius if (bev->rate_limiting->limit.write_limit <= 0) { 322290001Sglebius bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW); 323290001Sglebius if (event_add(&bev->rate_limiting->refill_bucket_event, 324290001Sglebius &bev->rate_limiting->cfg->tick_timeout) < 0) 325290001Sglebius r = -1; 326290001Sglebius } else if (bev->write_suspended & BEV_SUSPEND_BW) { 327290001Sglebius if (!(bev->read_suspended & BEV_SUSPEND_BW)) 328290001Sglebius event_del(&bev->rate_limiting->refill_bucket_event); 329290001Sglebius bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 330290001Sglebius } 331290001Sglebius } 332290001Sglebius 333290001Sglebius if (bev->rate_limiting->group) { 334290001Sglebius LOCK_GROUP(bev->rate_limiting->group); 335290001Sglebius bev->rate_limiting->group->rate_limit.write_limit -= bytes; 336290001Sglebius bev->rate_limiting->group->total_written += bytes; 337290001Sglebius if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { 338290001Sglebius bev_group_suspend_writing_(bev->rate_limiting->group); 339290001Sglebius } else if (bev->rate_limiting->group->write_suspended) { 340290001Sglebius bev_group_unsuspend_writing_(bev->rate_limiting->group); 341290001Sglebius } 342290001Sglebius UNLOCK_GROUP(bev->rate_limiting->group); 343290001Sglebius } 344290001Sglebius 345290001Sglebius return r; 346290001Sglebius} 347290001Sglebius 348290001Sglebius/** Stop reading on every bufferevent in <b>g</b> */ 349290001Sglebiusstatic int 350290001Sglebiusbev_group_suspend_reading_(struct bufferevent_rate_limit_group *g) 351290001Sglebius{ 352290001Sglebius /* Needs group lock */ 353290001Sglebius struct bufferevent_private *bev; 354290001Sglebius g->read_suspended = 1; 355290001Sglebius g->pending_unsuspend_read = 0; 356290001Sglebius 357290001Sglebius /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK, 358290001Sglebius to prevent a deadlock. (Ordinarily, the group lock nests inside 359290001Sglebius the bufferevent locks. If we are unable to lock any individual 360290001Sglebius bufferevent, it will find out later when it looks at its limit 361290001Sglebius and sees that its group is suspended.) 362290001Sglebius */ 363290001Sglebius LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 364290001Sglebius if (EVLOCK_TRY_LOCK_(bev->lock)) { 365290001Sglebius bufferevent_suspend_read_(&bev->bev, 366290001Sglebius BEV_SUSPEND_BW_GROUP); 367290001Sglebius EVLOCK_UNLOCK(bev->lock, 0); 368290001Sglebius } 369290001Sglebius } 370290001Sglebius return 0; 371290001Sglebius} 372290001Sglebius 373290001Sglebius/** Stop writing on every bufferevent in <b>g</b> */ 374290001Sglebiusstatic int 375290001Sglebiusbev_group_suspend_writing_(struct bufferevent_rate_limit_group *g) 376290001Sglebius{ 377290001Sglebius /* Needs group lock */ 378290001Sglebius struct bufferevent_private *bev; 379290001Sglebius g->write_suspended = 1; 380290001Sglebius g->pending_unsuspend_write = 0; 381290001Sglebius LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 382290001Sglebius if (EVLOCK_TRY_LOCK_(bev->lock)) { 383290001Sglebius bufferevent_suspend_write_(&bev->bev, 384290001Sglebius BEV_SUSPEND_BW_GROUP); 385290001Sglebius EVLOCK_UNLOCK(bev->lock, 0); 386290001Sglebius } 387290001Sglebius } 388290001Sglebius return 0; 389290001Sglebius} 390290001Sglebius 391290001Sglebius/** Timer callback invoked on a single bufferevent with one or more exhausted 392290001Sglebius buckets when they are ready to refill. */ 393290001Sglebiusstatic void 394290001Sglebiusbev_refill_callback_(evutil_socket_t fd, short what, void *arg) 395290001Sglebius{ 396290001Sglebius unsigned tick; 397290001Sglebius struct timeval now; 398290001Sglebius struct bufferevent_private *bev = arg; 399290001Sglebius int again = 0; 400290001Sglebius BEV_LOCK(&bev->bev); 401290001Sglebius if (!bev->rate_limiting || !bev->rate_limiting->cfg) { 402290001Sglebius BEV_UNLOCK(&bev->bev); 403290001Sglebius return; 404290001Sglebius } 405290001Sglebius 406290001Sglebius /* First, update the bucket */ 407290001Sglebius event_base_gettimeofday_cached(bev->bev.ev_base, &now); 408290001Sglebius tick = ev_token_bucket_get_tick_(&now, 409290001Sglebius bev->rate_limiting->cfg); 410290001Sglebius ev_token_bucket_update_(&bev->rate_limiting->limit, 411290001Sglebius bev->rate_limiting->cfg, 412290001Sglebius tick); 413290001Sglebius 414290001Sglebius /* Now unsuspend any read/write operations as appropriate. */ 415290001Sglebius if ((bev->read_suspended & BEV_SUSPEND_BW)) { 416290001Sglebius if (bev->rate_limiting->limit.read_limit > 0) 417290001Sglebius bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 418290001Sglebius else 419290001Sglebius again = 1; 420290001Sglebius } 421290001Sglebius if ((bev->write_suspended & BEV_SUSPEND_BW)) { 422290001Sglebius if (bev->rate_limiting->limit.write_limit > 0) 423290001Sglebius bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 424290001Sglebius else 425290001Sglebius again = 1; 426290001Sglebius } 427290001Sglebius if (again) { 428290001Sglebius /* One or more of the buckets may need another refill if they 429290001Sglebius started negative. 430290001Sglebius 431290001Sglebius XXXX if we need to be quiet for more ticks, we should 432290001Sglebius maybe figure out what timeout we really want. 433290001Sglebius */ 434290001Sglebius /* XXXX Handle event_add failure somehow */ 435290001Sglebius event_add(&bev->rate_limiting->refill_bucket_event, 436290001Sglebius &bev->rate_limiting->cfg->tick_timeout); 437290001Sglebius } 438290001Sglebius BEV_UNLOCK(&bev->bev); 439290001Sglebius} 440290001Sglebius 441290001Sglebius/** Helper: grab a random element from a bufferevent group. 442290001Sglebius * 443290001Sglebius * Requires that we hold the lock on the group. 444290001Sglebius */ 445290001Sglebiusstatic struct bufferevent_private * 446290001Sglebiusbev_group_random_element_(struct bufferevent_rate_limit_group *group) 447290001Sglebius{ 448290001Sglebius int which; 449290001Sglebius struct bufferevent_private *bev; 450290001Sglebius 451290001Sglebius /* requires group lock */ 452290001Sglebius 453290001Sglebius if (!group->n_members) 454290001Sglebius return NULL; 455290001Sglebius 456290001Sglebius EVUTIL_ASSERT(! LIST_EMPTY(&group->members)); 457290001Sglebius 458290001Sglebius which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members); 459290001Sglebius 460290001Sglebius bev = LIST_FIRST(&group->members); 461290001Sglebius while (which--) 462290001Sglebius bev = LIST_NEXT(bev, rate_limiting->next_in_group); 463290001Sglebius 464290001Sglebius return bev; 465290001Sglebius} 466290001Sglebius 467290001Sglebius/** Iterate over the elements of a rate-limiting group 'g' with a random 468290001Sglebius starting point, assigning each to the variable 'bev', and executing the 469290001Sglebius block 'block'. 470290001Sglebius 471290001Sglebius We do this in a half-baked effort to get fairness among group members. 472290001Sglebius XXX Round-robin or some kind of priority queue would be even more fair. 473290001Sglebius */ 474290001Sglebius#define FOREACH_RANDOM_ORDER(block) \ 475290001Sglebius do { \ 476290001Sglebius first = bev_group_random_element_(g); \ 477290001Sglebius for (bev = first; bev != LIST_END(&g->members); \ 478290001Sglebius bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 479290001Sglebius block ; \ 480290001Sglebius } \ 481290001Sglebius for (bev = LIST_FIRST(&g->members); bev && bev != first; \ 482290001Sglebius bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 483290001Sglebius block ; \ 484290001Sglebius } \ 485290001Sglebius } while (0) 486290001Sglebius 487290001Sglebiusstatic void 488290001Sglebiusbev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g) 489290001Sglebius{ 490290001Sglebius int again = 0; 491290001Sglebius struct bufferevent_private *bev, *first; 492290001Sglebius 493290001Sglebius g->read_suspended = 0; 494290001Sglebius FOREACH_RANDOM_ORDER({ 495290001Sglebius if (EVLOCK_TRY_LOCK_(bev->lock)) { 496290001Sglebius bufferevent_unsuspend_read_(&bev->bev, 497290001Sglebius BEV_SUSPEND_BW_GROUP); 498290001Sglebius EVLOCK_UNLOCK(bev->lock, 0); 499290001Sglebius } else { 500290001Sglebius again = 1; 501290001Sglebius } 502290001Sglebius }); 503290001Sglebius g->pending_unsuspend_read = again; 504290001Sglebius} 505290001Sglebius 506290001Sglebiusstatic void 507290001Sglebiusbev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g) 508290001Sglebius{ 509290001Sglebius int again = 0; 510290001Sglebius struct bufferevent_private *bev, *first; 511290001Sglebius g->write_suspended = 0; 512290001Sglebius 513290001Sglebius FOREACH_RANDOM_ORDER({ 514290001Sglebius if (EVLOCK_TRY_LOCK_(bev->lock)) { 515290001Sglebius bufferevent_unsuspend_write_(&bev->bev, 516290001Sglebius BEV_SUSPEND_BW_GROUP); 517290001Sglebius EVLOCK_UNLOCK(bev->lock, 0); 518290001Sglebius } else { 519290001Sglebius again = 1; 520290001Sglebius } 521290001Sglebius }); 522290001Sglebius g->pending_unsuspend_write = again; 523290001Sglebius} 524290001Sglebius 525290001Sglebius/** Callback invoked every tick to add more elements to the group bucket 526290001Sglebius and unsuspend group members as needed. 527290001Sglebius */ 528290001Sglebiusstatic void 529290001Sglebiusbev_group_refill_callback_(evutil_socket_t fd, short what, void *arg) 530290001Sglebius{ 531290001Sglebius struct bufferevent_rate_limit_group *g = arg; 532290001Sglebius unsigned tick; 533290001Sglebius struct timeval now; 534290001Sglebius 535290001Sglebius event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); 536290001Sglebius 537290001Sglebius LOCK_GROUP(g); 538290001Sglebius 539290001Sglebius tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg); 540290001Sglebius ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick); 541290001Sglebius 542290001Sglebius if (g->pending_unsuspend_read || 543290001Sglebius (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { 544290001Sglebius bev_group_unsuspend_reading_(g); 545290001Sglebius } 546290001Sglebius if (g->pending_unsuspend_write || 547290001Sglebius (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ 548290001Sglebius bev_group_unsuspend_writing_(g); 549290001Sglebius } 550290001Sglebius 551290001Sglebius /* XXXX Rather than waiting to the next tick to unsuspend stuff 552290001Sglebius * with pending_unsuspend_write/read, we should do it on the 553290001Sglebius * next iteration of the mainloop. 554290001Sglebius */ 555290001Sglebius 556290001Sglebius UNLOCK_GROUP(g); 557290001Sglebius} 558290001Sglebius 559290001Sglebiusint 560290001Sglebiusbufferevent_set_rate_limit(struct bufferevent *bev, 561290001Sglebius struct ev_token_bucket_cfg *cfg) 562290001Sglebius{ 563290001Sglebius struct bufferevent_private *bevp = 564290001Sglebius EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 565290001Sglebius int r = -1; 566290001Sglebius struct bufferevent_rate_limit *rlim; 567290001Sglebius struct timeval now; 568290001Sglebius ev_uint32_t tick; 569290001Sglebius int reinit = 0, suspended = 0; 570290001Sglebius /* XXX reference-count cfg */ 571290001Sglebius 572290001Sglebius BEV_LOCK(bev); 573290001Sglebius 574290001Sglebius if (cfg == NULL) { 575290001Sglebius if (bevp->rate_limiting) { 576290001Sglebius rlim = bevp->rate_limiting; 577290001Sglebius rlim->cfg = NULL; 578290001Sglebius bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 579290001Sglebius bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 580290001Sglebius if (event_initialized(&rlim->refill_bucket_event)) 581290001Sglebius event_del(&rlim->refill_bucket_event); 582290001Sglebius } 583290001Sglebius r = 0; 584290001Sglebius goto done; 585290001Sglebius } 586290001Sglebius 587290001Sglebius event_base_gettimeofday_cached(bev->ev_base, &now); 588290001Sglebius tick = ev_token_bucket_get_tick_(&now, cfg); 589290001Sglebius 590290001Sglebius if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { 591290001Sglebius /* no-op */ 592290001Sglebius r = 0; 593290001Sglebius goto done; 594290001Sglebius } 595290001Sglebius if (bevp->rate_limiting == NULL) { 596290001Sglebius rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 597290001Sglebius if (!rlim) 598290001Sglebius goto done; 599290001Sglebius bevp->rate_limiting = rlim; 600290001Sglebius } else { 601290001Sglebius rlim = bevp->rate_limiting; 602290001Sglebius } 603290001Sglebius reinit = rlim->cfg != NULL; 604290001Sglebius 605290001Sglebius rlim->cfg = cfg; 606290001Sglebius ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit); 607290001Sglebius 608290001Sglebius if (reinit) { 609290001Sglebius EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); 610290001Sglebius event_del(&rlim->refill_bucket_event); 611290001Sglebius } 612290001Sglebius event_assign(&rlim->refill_bucket_event, bev->ev_base, 613290001Sglebius -1, EV_FINALIZE, bev_refill_callback_, bevp); 614290001Sglebius 615290001Sglebius if (rlim->limit.read_limit > 0) { 616290001Sglebius bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 617290001Sglebius } else { 618290001Sglebius bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 619290001Sglebius suspended=1; 620290001Sglebius } 621290001Sglebius if (rlim->limit.write_limit > 0) { 622290001Sglebius bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 623290001Sglebius } else { 624290001Sglebius bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 625290001Sglebius suspended = 1; 626290001Sglebius } 627290001Sglebius 628290001Sglebius if (suspended) 629290001Sglebius event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); 630290001Sglebius 631290001Sglebius r = 0; 632290001Sglebius 633290001Sglebiusdone: 634290001Sglebius BEV_UNLOCK(bev); 635290001Sglebius return r; 636290001Sglebius} 637290001Sglebius 638290001Sglebiusstruct bufferevent_rate_limit_group * 639290001Sglebiusbufferevent_rate_limit_group_new(struct event_base *base, 640290001Sglebius const struct ev_token_bucket_cfg *cfg) 641290001Sglebius{ 642290001Sglebius struct bufferevent_rate_limit_group *g; 643290001Sglebius struct timeval now; 644290001Sglebius ev_uint32_t tick; 645290001Sglebius 646290001Sglebius event_base_gettimeofday_cached(base, &now); 647290001Sglebius tick = ev_token_bucket_get_tick_(&now, cfg); 648290001Sglebius 649290001Sglebius g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); 650290001Sglebius if (!g) 651290001Sglebius return NULL; 652290001Sglebius memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 653290001Sglebius LIST_INIT(&g->members); 654290001Sglebius 655290001Sglebius ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); 656290001Sglebius 657290001Sglebius event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, 658290001Sglebius bev_group_refill_callback_, g); 659290001Sglebius /*XXXX handle event_add failure */ 660290001Sglebius event_add(&g->master_refill_event, &cfg->tick_timeout); 661290001Sglebius 662290001Sglebius EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 663290001Sglebius 664290001Sglebius bufferevent_rate_limit_group_set_min_share(g, 64); 665290001Sglebius 666290001Sglebius evutil_weakrand_seed_(&g->weakrand_seed, 667290001Sglebius (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g)); 668290001Sglebius 669290001Sglebius return g; 670290001Sglebius} 671290001Sglebius 672290001Sglebiusint 673290001Sglebiusbufferevent_rate_limit_group_set_cfg( 674290001Sglebius struct bufferevent_rate_limit_group *g, 675290001Sglebius const struct ev_token_bucket_cfg *cfg) 676290001Sglebius{ 677290001Sglebius int same_tick; 678290001Sglebius if (!g || !cfg) 679290001Sglebius return -1; 680290001Sglebius 681290001Sglebius LOCK_GROUP(g); 682290001Sglebius same_tick = evutil_timercmp( 683290001Sglebius &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); 684290001Sglebius memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 685290001Sglebius 686290001Sglebius if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) 687290001Sglebius g->rate_limit.read_limit = cfg->read_maximum; 688290001Sglebius if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) 689290001Sglebius g->rate_limit.write_limit = cfg->write_maximum; 690290001Sglebius 691290001Sglebius if (!same_tick) { 692290001Sglebius /* This can cause a hiccup in the schedule */ 693290001Sglebius event_add(&g->master_refill_event, &cfg->tick_timeout); 694290001Sglebius } 695290001Sglebius 696290001Sglebius /* The new limits might force us to adjust min_share differently. */ 697290001Sglebius bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); 698290001Sglebius 699290001Sglebius UNLOCK_GROUP(g); 700290001Sglebius return 0; 701290001Sglebius} 702290001Sglebius 703290001Sglebiusint 704290001Sglebiusbufferevent_rate_limit_group_set_min_share( 705290001Sglebius struct bufferevent_rate_limit_group *g, 706290001Sglebius size_t share) 707290001Sglebius{ 708290001Sglebius if (share > EV_SSIZE_MAX) 709290001Sglebius return -1; 710290001Sglebius 711290001Sglebius g->configured_min_share = share; 712290001Sglebius 713290001Sglebius /* Can't set share to less than the one-tick maximum. IOW, at steady 714290001Sglebius * state, at least one connection can go per tick. */ 715290001Sglebius if (share > g->rate_limit_cfg.read_rate) 716290001Sglebius share = g->rate_limit_cfg.read_rate; 717290001Sglebius if (share > g->rate_limit_cfg.write_rate) 718290001Sglebius share = g->rate_limit_cfg.write_rate; 719290001Sglebius 720290001Sglebius g->min_share = share; 721290001Sglebius return 0; 722290001Sglebius} 723290001Sglebius 724290001Sglebiusvoid 725290001Sglebiusbufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) 726290001Sglebius{ 727290001Sglebius LOCK_GROUP(g); 728290001Sglebius EVUTIL_ASSERT(0 == g->n_members); 729290001Sglebius event_del(&g->master_refill_event); 730290001Sglebius UNLOCK_GROUP(g); 731290001Sglebius EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 732290001Sglebius mm_free(g); 733290001Sglebius} 734290001Sglebius 735290001Sglebiusint 736290001Sglebiusbufferevent_add_to_rate_limit_group(struct bufferevent *bev, 737290001Sglebius struct bufferevent_rate_limit_group *g) 738290001Sglebius{ 739290001Sglebius int wsuspend, rsuspend; 740290001Sglebius struct bufferevent_private *bevp = 741290001Sglebius EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 742290001Sglebius BEV_LOCK(bev); 743290001Sglebius 744290001Sglebius if (!bevp->rate_limiting) { 745290001Sglebius struct bufferevent_rate_limit *rlim; 746290001Sglebius rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 747290001Sglebius if (!rlim) { 748290001Sglebius BEV_UNLOCK(bev); 749290001Sglebius return -1; 750290001Sglebius } 751290001Sglebius event_assign(&rlim->refill_bucket_event, bev->ev_base, 752290001Sglebius -1, EV_FINALIZE, bev_refill_callback_, bevp); 753290001Sglebius bevp->rate_limiting = rlim; 754290001Sglebius } 755290001Sglebius 756290001Sglebius if (bevp->rate_limiting->group == g) { 757290001Sglebius BEV_UNLOCK(bev); 758290001Sglebius return 0; 759290001Sglebius } 760290001Sglebius if (bevp->rate_limiting->group) 761290001Sglebius bufferevent_remove_from_rate_limit_group(bev); 762290001Sglebius 763290001Sglebius LOCK_GROUP(g); 764290001Sglebius bevp->rate_limiting->group = g; 765290001Sglebius ++g->n_members; 766290001Sglebius LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group); 767290001Sglebius 768290001Sglebius rsuspend = g->read_suspended; 769290001Sglebius wsuspend = g->write_suspended; 770290001Sglebius 771290001Sglebius UNLOCK_GROUP(g); 772290001Sglebius 773290001Sglebius if (rsuspend) 774290001Sglebius bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP); 775290001Sglebius if (wsuspend) 776290001Sglebius bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP); 777290001Sglebius 778290001Sglebius BEV_UNLOCK(bev); 779290001Sglebius return 0; 780290001Sglebius} 781290001Sglebius 782290001Sglebiusint 783290001Sglebiusbufferevent_remove_from_rate_limit_group(struct bufferevent *bev) 784290001Sglebius{ 785290001Sglebius return bufferevent_remove_from_rate_limit_group_internal_(bev, 1); 786290001Sglebius} 787290001Sglebius 788290001Sglebiusint 789290001Sglebiusbufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev, 790290001Sglebius int unsuspend) 791290001Sglebius{ 792290001Sglebius struct bufferevent_private *bevp = 793290001Sglebius EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 794290001Sglebius BEV_LOCK(bev); 795290001Sglebius if (bevp->rate_limiting && bevp->rate_limiting->group) { 796290001Sglebius struct bufferevent_rate_limit_group *g = 797290001Sglebius bevp->rate_limiting->group; 798290001Sglebius LOCK_GROUP(g); 799290001Sglebius bevp->rate_limiting->group = NULL; 800290001Sglebius --g->n_members; 801290001Sglebius LIST_REMOVE(bevp, rate_limiting->next_in_group); 802290001Sglebius UNLOCK_GROUP(g); 803290001Sglebius } 804290001Sglebius if (unsuspend) { 805290001Sglebius bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP); 806290001Sglebius bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP); 807290001Sglebius } 808290001Sglebius BEV_UNLOCK(bev); 809290001Sglebius return 0; 810290001Sglebius} 811290001Sglebius 812290001Sglebius/* === 813290001Sglebius * API functions to expose rate limits. 814290001Sglebius * 815290001Sglebius * Don't use these from inside Libevent; they're meant to be for use by 816290001Sglebius * the program. 817290001Sglebius * === */ 818290001Sglebius 819290001Sglebius/* Mostly you don't want to use this function from inside libevent; 820290001Sglebius * bufferevent_get_read_max_() is more likely what you want*/ 821290001Sglebiusev_ssize_t 822290001Sglebiusbufferevent_get_read_limit(struct bufferevent *bev) 823290001Sglebius{ 824290001Sglebius ev_ssize_t r; 825290001Sglebius struct bufferevent_private *bevp; 826290001Sglebius BEV_LOCK(bev); 827290001Sglebius bevp = BEV_UPCAST(bev); 828290001Sglebius if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 829290001Sglebius bufferevent_update_buckets(bevp); 830290001Sglebius r = bevp->rate_limiting->limit.read_limit; 831290001Sglebius } else { 832290001Sglebius r = EV_SSIZE_MAX; 833290001Sglebius } 834290001Sglebius BEV_UNLOCK(bev); 835290001Sglebius return r; 836290001Sglebius} 837290001Sglebius 838290001Sglebius/* Mostly you don't want to use this function from inside libevent; 839290001Sglebius * bufferevent_get_write_max_() is more likely what you want*/ 840290001Sglebiusev_ssize_t 841290001Sglebiusbufferevent_get_write_limit(struct bufferevent *bev) 842290001Sglebius{ 843290001Sglebius ev_ssize_t r; 844290001Sglebius struct bufferevent_private *bevp; 845290001Sglebius BEV_LOCK(bev); 846290001Sglebius bevp = BEV_UPCAST(bev); 847290001Sglebius if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 848290001Sglebius bufferevent_update_buckets(bevp); 849290001Sglebius r = bevp->rate_limiting->limit.write_limit; 850290001Sglebius } else { 851290001Sglebius r = EV_SSIZE_MAX; 852290001Sglebius } 853290001Sglebius BEV_UNLOCK(bev); 854290001Sglebius return r; 855290001Sglebius} 856290001Sglebius 857290001Sglebiusint 858290001Sglebiusbufferevent_set_max_single_read(struct bufferevent *bev, size_t size) 859290001Sglebius{ 860290001Sglebius struct bufferevent_private *bevp; 861290001Sglebius BEV_LOCK(bev); 862290001Sglebius bevp = BEV_UPCAST(bev); 863290001Sglebius if (size == 0 || size > EV_SSIZE_MAX) 864290001Sglebius bevp->max_single_read = MAX_SINGLE_READ_DEFAULT; 865290001Sglebius else 866290001Sglebius bevp->max_single_read = size; 867290001Sglebius BEV_UNLOCK(bev); 868290001Sglebius return 0; 869290001Sglebius} 870290001Sglebius 871290001Sglebiusint 872290001Sglebiusbufferevent_set_max_single_write(struct bufferevent *bev, size_t size) 873290001Sglebius{ 874290001Sglebius struct bufferevent_private *bevp; 875290001Sglebius BEV_LOCK(bev); 876290001Sglebius bevp = BEV_UPCAST(bev); 877290001Sglebius if (size == 0 || size > EV_SSIZE_MAX) 878290001Sglebius bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 879290001Sglebius else 880290001Sglebius bevp->max_single_write = size; 881290001Sglebius BEV_UNLOCK(bev); 882290001Sglebius return 0; 883290001Sglebius} 884290001Sglebius 885290001Sglebiusev_ssize_t 886290001Sglebiusbufferevent_get_max_single_read(struct bufferevent *bev) 887290001Sglebius{ 888290001Sglebius ev_ssize_t r; 889290001Sglebius 890290001Sglebius BEV_LOCK(bev); 891290001Sglebius r = BEV_UPCAST(bev)->max_single_read; 892290001Sglebius BEV_UNLOCK(bev); 893290001Sglebius return r; 894290001Sglebius} 895290001Sglebius 896290001Sglebiusev_ssize_t 897290001Sglebiusbufferevent_get_max_single_write(struct bufferevent *bev) 898290001Sglebius{ 899290001Sglebius ev_ssize_t r; 900290001Sglebius 901290001Sglebius BEV_LOCK(bev); 902290001Sglebius r = BEV_UPCAST(bev)->max_single_write; 903290001Sglebius BEV_UNLOCK(bev); 904290001Sglebius return r; 905290001Sglebius} 906290001Sglebius 907290001Sglebiusev_ssize_t 908290001Sglebiusbufferevent_get_max_to_read(struct bufferevent *bev) 909290001Sglebius{ 910290001Sglebius ev_ssize_t r; 911290001Sglebius BEV_LOCK(bev); 912290001Sglebius r = bufferevent_get_read_max_(BEV_UPCAST(bev)); 913290001Sglebius BEV_UNLOCK(bev); 914290001Sglebius return r; 915290001Sglebius} 916290001Sglebius 917290001Sglebiusev_ssize_t 918290001Sglebiusbufferevent_get_max_to_write(struct bufferevent *bev) 919290001Sglebius{ 920290001Sglebius ev_ssize_t r; 921290001Sglebius BEV_LOCK(bev); 922290001Sglebius r = bufferevent_get_write_max_(BEV_UPCAST(bev)); 923290001Sglebius BEV_UNLOCK(bev); 924290001Sglebius return r; 925290001Sglebius} 926290001Sglebius 927290001Sglebiusconst struct ev_token_bucket_cfg * 928290001Sglebiusbufferevent_get_token_bucket_cfg(const struct bufferevent *bev) { 929290001Sglebius struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 930290001Sglebius struct ev_token_bucket_cfg *cfg; 931290001Sglebius 932290001Sglebius BEV_LOCK(bev); 933290001Sglebius 934290001Sglebius if (bufev_private->rate_limiting) { 935290001Sglebius cfg = bufev_private->rate_limiting->cfg; 936290001Sglebius } else { 937290001Sglebius cfg = NULL; 938290001Sglebius } 939290001Sglebius 940290001Sglebius BEV_UNLOCK(bev); 941290001Sglebius 942290001Sglebius return cfg; 943290001Sglebius} 944290001Sglebius 945290001Sglebius/* Mostly you don't want to use this function from inside libevent; 946290001Sglebius * bufferevent_get_read_max_() is more likely what you want*/ 947290001Sglebiusev_ssize_t 948290001Sglebiusbufferevent_rate_limit_group_get_read_limit( 949290001Sglebius struct bufferevent_rate_limit_group *grp) 950290001Sglebius{ 951290001Sglebius ev_ssize_t r; 952290001Sglebius LOCK_GROUP(grp); 953290001Sglebius r = grp->rate_limit.read_limit; 954290001Sglebius UNLOCK_GROUP(grp); 955290001Sglebius return r; 956290001Sglebius} 957290001Sglebius 958290001Sglebius/* Mostly you don't want to use this function from inside libevent; 959290001Sglebius * bufferevent_get_write_max_() is more likely what you want. */ 960290001Sglebiusev_ssize_t 961290001Sglebiusbufferevent_rate_limit_group_get_write_limit( 962290001Sglebius struct bufferevent_rate_limit_group *grp) 963290001Sglebius{ 964290001Sglebius ev_ssize_t r; 965290001Sglebius LOCK_GROUP(grp); 966290001Sglebius r = grp->rate_limit.write_limit; 967290001Sglebius UNLOCK_GROUP(grp); 968290001Sglebius return r; 969290001Sglebius} 970290001Sglebius 971290001Sglebiusint 972290001Sglebiusbufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) 973290001Sglebius{ 974290001Sglebius int r = 0; 975290001Sglebius ev_ssize_t old_limit, new_limit; 976290001Sglebius struct bufferevent_private *bevp; 977290001Sglebius BEV_LOCK(bev); 978290001Sglebius bevp = BEV_UPCAST(bev); 979290001Sglebius EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 980290001Sglebius old_limit = bevp->rate_limiting->limit.read_limit; 981290001Sglebius 982290001Sglebius new_limit = (bevp->rate_limiting->limit.read_limit -= decr); 983290001Sglebius if (old_limit > 0 && new_limit <= 0) { 984290001Sglebius bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 985290001Sglebius if (event_add(&bevp->rate_limiting->refill_bucket_event, 986290001Sglebius &bevp->rate_limiting->cfg->tick_timeout) < 0) 987290001Sglebius r = -1; 988290001Sglebius } else if (old_limit <= 0 && new_limit > 0) { 989290001Sglebius if (!(bevp->write_suspended & BEV_SUSPEND_BW)) 990290001Sglebius event_del(&bevp->rate_limiting->refill_bucket_event); 991290001Sglebius bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 992290001Sglebius } 993290001Sglebius 994290001Sglebius BEV_UNLOCK(bev); 995290001Sglebius return r; 996290001Sglebius} 997290001Sglebius 998290001Sglebiusint 999290001Sglebiusbufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) 1000290001Sglebius{ 1001290001Sglebius /* XXXX this is mostly copy-and-paste from 1002290001Sglebius * bufferevent_decrement_read_limit */ 1003290001Sglebius int r = 0; 1004290001Sglebius ev_ssize_t old_limit, new_limit; 1005290001Sglebius struct bufferevent_private *bevp; 1006290001Sglebius BEV_LOCK(bev); 1007290001Sglebius bevp = BEV_UPCAST(bev); 1008290001Sglebius EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 1009290001Sglebius old_limit = bevp->rate_limiting->limit.write_limit; 1010290001Sglebius 1011290001Sglebius new_limit = (bevp->rate_limiting->limit.write_limit -= decr); 1012290001Sglebius if (old_limit > 0 && new_limit <= 0) { 1013290001Sglebius bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 1014290001Sglebius if (event_add(&bevp->rate_limiting->refill_bucket_event, 1015290001Sglebius &bevp->rate_limiting->cfg->tick_timeout) < 0) 1016290001Sglebius r = -1; 1017290001Sglebius } else if (old_limit <= 0 && new_limit > 0) { 1018290001Sglebius if (!(bevp->read_suspended & BEV_SUSPEND_BW)) 1019290001Sglebius event_del(&bevp->rate_limiting->refill_bucket_event); 1020290001Sglebius bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 1021290001Sglebius } 1022290001Sglebius 1023290001Sglebius BEV_UNLOCK(bev); 1024290001Sglebius return r; 1025290001Sglebius} 1026290001Sglebius 1027290001Sglebiusint 1028290001Sglebiusbufferevent_rate_limit_group_decrement_read( 1029290001Sglebius struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1030290001Sglebius{ 1031290001Sglebius int r = 0; 1032290001Sglebius ev_ssize_t old_limit, new_limit; 1033290001Sglebius LOCK_GROUP(grp); 1034290001Sglebius old_limit = grp->rate_limit.read_limit; 1035290001Sglebius new_limit = (grp->rate_limit.read_limit -= decr); 1036290001Sglebius 1037290001Sglebius if (old_limit > 0 && new_limit <= 0) { 1038290001Sglebius bev_group_suspend_reading_(grp); 1039290001Sglebius } else if (old_limit <= 0 && new_limit > 0) { 1040290001Sglebius bev_group_unsuspend_reading_(grp); 1041290001Sglebius } 1042290001Sglebius 1043290001Sglebius UNLOCK_GROUP(grp); 1044290001Sglebius return r; 1045290001Sglebius} 1046290001Sglebius 1047290001Sglebiusint 1048290001Sglebiusbufferevent_rate_limit_group_decrement_write( 1049290001Sglebius struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1050290001Sglebius{ 1051290001Sglebius int r = 0; 1052290001Sglebius ev_ssize_t old_limit, new_limit; 1053290001Sglebius LOCK_GROUP(grp); 1054290001Sglebius old_limit = grp->rate_limit.write_limit; 1055290001Sglebius new_limit = (grp->rate_limit.write_limit -= decr); 1056290001Sglebius 1057290001Sglebius if (old_limit > 0 && new_limit <= 0) { 1058290001Sglebius bev_group_suspend_writing_(grp); 1059290001Sglebius } else if (old_limit <= 0 && new_limit > 0) { 1060290001Sglebius bev_group_unsuspend_writing_(grp); 1061290001Sglebius } 1062290001Sglebius 1063290001Sglebius UNLOCK_GROUP(grp); 1064290001Sglebius return r; 1065290001Sglebius} 1066290001Sglebius 1067290001Sglebiusvoid 1068290001Sglebiusbufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, 1069290001Sglebius ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) 1070290001Sglebius{ 1071290001Sglebius EVUTIL_ASSERT(grp != NULL); 1072290001Sglebius if (total_read_out) 1073290001Sglebius *total_read_out = grp->total_read; 1074290001Sglebius if (total_written_out) 1075290001Sglebius *total_written_out = grp->total_written; 1076290001Sglebius} 1077290001Sglebius 1078290001Sglebiusvoid 1079290001Sglebiusbufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) 1080290001Sglebius{ 1081290001Sglebius grp->total_read = grp->total_written = 0; 1082290001Sglebius} 1083290001Sglebius 1084290001Sglebiusint 1085290001Sglebiusbufferevent_ratelim_init_(struct bufferevent_private *bev) 1086290001Sglebius{ 1087290001Sglebius bev->rate_limiting = NULL; 1088290001Sglebius bev->max_single_read = MAX_SINGLE_READ_DEFAULT; 1089290001Sglebius bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 1090290001Sglebius 1091290001Sglebius return 0; 1092290001Sglebius} 1093