1275970Scy/* 2275970Scy * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 3275970Scy * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 4275970Scy * All rights reserved. 5275970Scy * 6275970Scy * Redistribution and use in source and binary forms, with or without 7275970Scy * modification, are permitted provided that the following conditions 8275970Scy * are met: 9275970Scy * 1. Redistributions of source code must retain the above copyright 10275970Scy * notice, this list of conditions and the following disclaimer. 11275970Scy * 2. Redistributions in binary form must reproduce the above copyright 12275970Scy * notice, this list of conditions and the following disclaimer in the 13275970Scy * documentation and/or other materials provided with the distribution. 14275970Scy * 3. The name of the author may not be used to endorse or promote products 15275970Scy * derived from this software without specific prior written permission. 16275970Scy * 17275970Scy * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18275970Scy * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19275970Scy * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20275970Scy * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21275970Scy * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22275970Scy * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23275970Scy * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24275970Scy * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25275970Scy * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26275970Scy * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27275970Scy */ 28275970Scy#include "evconfig-private.h" 29275970Scy 30275970Scy#include <sys/types.h> 31275970Scy#include <limits.h> 32275970Scy#include <string.h> 33275970Scy#include <stdlib.h> 34275970Scy 35275970Scy#include "event2/event.h" 36275970Scy#include "event2/event_struct.h" 37275970Scy#include "event2/util.h" 38275970Scy#include "event2/bufferevent.h" 39275970Scy#include "event2/bufferevent_struct.h" 40275970Scy#include "event2/buffer.h" 41275970Scy 42275970Scy#include "ratelim-internal.h" 43275970Scy 44275970Scy#include "bufferevent-internal.h" 45275970Scy#include "mm-internal.h" 46275970Scy#include "util-internal.h" 47275970Scy#include "event-internal.h" 48275970Scy 49275970Scyint 50275970Scyev_token_bucket_init_(struct ev_token_bucket *bucket, 51275970Scy const struct ev_token_bucket_cfg *cfg, 52275970Scy ev_uint32_t current_tick, 53275970Scy int reinitialize) 54275970Scy{ 55275970Scy if (reinitialize) { 56275970Scy /* on reinitialization, we only clip downwards, since we've 57275970Scy already used who-knows-how-much bandwidth this tick. We 58275970Scy leave "last_updated" as it is; the next update will add the 59275970Scy appropriate amount of bandwidth to the bucket. 60275970Scy */ 61275970Scy if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) 62275970Scy bucket->read_limit = cfg->read_maximum; 63275970Scy if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) 64275970Scy bucket->write_limit = cfg->write_maximum; 65275970Scy } else { 66275970Scy bucket->read_limit = cfg->read_rate; 67275970Scy bucket->write_limit = cfg->write_rate; 68275970Scy bucket->last_updated = current_tick; 69275970Scy } 70275970Scy return 0; 71275970Scy} 72275970Scy 73275970Scyint 74275970Scyev_token_bucket_update_(struct ev_token_bucket *bucket, 75275970Scy const struct ev_token_bucket_cfg *cfg, 76275970Scy ev_uint32_t current_tick) 77275970Scy{ 78275970Scy /* It's okay if the tick number overflows, since we'll just 79275970Scy * wrap around when we do the unsigned substraction. */ 80275970Scy unsigned n_ticks = current_tick - bucket->last_updated; 81275970Scy 82275970Scy /* Make sure some ticks actually happened, and that time didn't 83275970Scy * roll back. */ 84275970Scy if (n_ticks == 0 || n_ticks > INT_MAX) 85275970Scy return 0; 86275970Scy 87275970Scy /* Naively, we would say 88275970Scy bucket->limit += n_ticks * cfg->rate; 89275970Scy 90275970Scy if (bucket->limit > cfg->maximum) 91275970Scy bucket->limit = cfg->maximum; 92275970Scy 93275970Scy But we're worried about overflow, so we do it like this: 94275970Scy */ 95275970Scy 96275970Scy if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) 97275970Scy bucket->read_limit = cfg->read_maximum; 98275970Scy else 99275970Scy bucket->read_limit += n_ticks * cfg->read_rate; 100275970Scy 101275970Scy 102275970Scy if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) 103275970Scy bucket->write_limit = cfg->write_maximum; 104275970Scy else 105275970Scy bucket->write_limit += n_ticks * cfg->write_rate; 106275970Scy 107275970Scy 108275970Scy bucket->last_updated = current_tick; 109275970Scy 110275970Scy return 1; 111275970Scy} 112275970Scy 113275970Scystatic inline void 114275970Scybufferevent_update_buckets(struct bufferevent_private *bev) 115275970Scy{ 116275970Scy /* Must hold lock on bev. */ 117275970Scy struct timeval now; 118275970Scy unsigned tick; 119275970Scy event_base_gettimeofday_cached(bev->bev.ev_base, &now); 120275970Scy tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg); 121275970Scy if (tick != bev->rate_limiting->limit.last_updated) 122275970Scy ev_token_bucket_update_(&bev->rate_limiting->limit, 123275970Scy bev->rate_limiting->cfg, tick); 124275970Scy} 125275970Scy 126275970Scyev_uint32_t 127275970Scyev_token_bucket_get_tick_(const struct timeval *tv, 128275970Scy const struct ev_token_bucket_cfg *cfg) 129275970Scy{ 130275970Scy /* This computation uses two multiplies and a divide. We could do 131275970Scy * fewer if we knew that the tick length was an integer number of 132275970Scy * seconds, or if we knew it divided evenly into a second. We should 133275970Scy * investigate that more. 134275970Scy */ 135275970Scy 136275970Scy /* We cast to an ev_uint64_t first, since we don't want to overflow 137275970Scy * before we do the final divide. */ 138275970Scy ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; 139275970Scy return (unsigned)(msec / cfg->msec_per_tick); 140275970Scy} 141275970Scy 142275970Scystruct ev_token_bucket_cfg * 143275970Scyev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, 144275970Scy size_t write_rate, size_t write_burst, 145275970Scy const struct timeval *tick_len) 146275970Scy{ 147275970Scy struct ev_token_bucket_cfg *r; 148275970Scy struct timeval g; 149275970Scy if (! tick_len) { 150275970Scy g.tv_sec = 1; 151275970Scy g.tv_usec = 0; 152275970Scy tick_len = &g; 153275970Scy } 154275970Scy if (read_rate > read_burst || write_rate > write_burst || 155275970Scy read_rate < 1 || write_rate < 1) 156275970Scy return NULL; 157275970Scy if (read_rate > EV_RATE_LIMIT_MAX || 158275970Scy write_rate > EV_RATE_LIMIT_MAX || 159275970Scy read_burst > EV_RATE_LIMIT_MAX || 160275970Scy write_burst > EV_RATE_LIMIT_MAX) 161275970Scy return NULL; 162275970Scy r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); 163275970Scy if (!r) 164275970Scy return NULL; 165275970Scy r->read_rate = read_rate; 166275970Scy r->write_rate = write_rate; 167275970Scy r->read_maximum = read_burst; 168275970Scy r->write_maximum = write_burst; 169275970Scy memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); 170275970Scy r->msec_per_tick = (tick_len->tv_sec * 1000) + 171275970Scy (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; 172275970Scy return r; 173275970Scy} 174275970Scy 175275970Scyvoid 176275970Scyev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) 177275970Scy{ 178275970Scy mm_free(cfg); 179275970Scy} 180275970Scy 181275970Scy/* Default values for max_single_read & max_single_write variables. */ 182275970Scy#define MAX_SINGLE_READ_DEFAULT 16384 183275970Scy#define MAX_SINGLE_WRITE_DEFAULT 16384 184275970Scy 185275970Scy#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) 186275970Scy#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) 187275970Scy 188275970Scystatic int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g); 189275970Scystatic int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g); 190275970Scystatic void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g); 191275970Scystatic void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g); 192275970Scy 193275970Scy/** Helper: figure out the maximum amount we should write if is_write, or 194275970Scy the maximum amount we should read if is_read. Return that maximum, or 195275970Scy 0 if our bucket is wholly exhausted. 196275970Scy */ 197275970Scystatic inline ev_ssize_t 198275970Scybufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write) 199275970Scy{ 200275970Scy /* needs lock on bev. */ 201275970Scy ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read; 202275970Scy 203275970Scy#define LIM(x) \ 204275970Scy (is_write ? (x).write_limit : (x).read_limit) 205275970Scy 206275970Scy#define GROUP_SUSPENDED(g) \ 207275970Scy (is_write ? (g)->write_suspended : (g)->read_suspended) 208275970Scy 209275970Scy /* Sets max_so_far to MIN(x, max_so_far) */ 210275970Scy#define CLAMPTO(x) \ 211275970Scy do { \ 212275970Scy if (max_so_far > (x)) \ 213275970Scy max_so_far = (x); \ 214275970Scy } while (0); 215275970Scy 216275970Scy if (!bev->rate_limiting) 217275970Scy return max_so_far; 218275970Scy 219275970Scy /* If rate-limiting is enabled at all, update the appropriate 220275970Scy bucket, and take the smaller of our rate limit and the group 221275970Scy rate limit. 222275970Scy */ 223275970Scy 224275970Scy if (bev->rate_limiting->cfg) { 225275970Scy bufferevent_update_buckets(bev); 226275970Scy max_so_far = LIM(bev->rate_limiting->limit); 227275970Scy } 228275970Scy if (bev->rate_limiting->group) { 229275970Scy struct bufferevent_rate_limit_group *g = 230275970Scy bev->rate_limiting->group; 231275970Scy ev_ssize_t share; 232275970Scy LOCK_GROUP(g); 233275970Scy if (GROUP_SUSPENDED(g)) { 234275970Scy /* We can get here if we failed to lock this 235275970Scy * particular bufferevent while suspending the whole 236275970Scy * group. */ 237275970Scy if (is_write) 238275970Scy bufferevent_suspend_write_(&bev->bev, 239275970Scy BEV_SUSPEND_BW_GROUP); 240275970Scy else 241275970Scy bufferevent_suspend_read_(&bev->bev, 242275970Scy BEV_SUSPEND_BW_GROUP); 243275970Scy share = 0; 244275970Scy } else { 245275970Scy /* XXXX probably we should divide among the active 246275970Scy * members, not the total members. */ 247275970Scy share = LIM(g->rate_limit) / g->n_members; 248275970Scy if (share < g->min_share) 249275970Scy share = g->min_share; 250275970Scy } 251275970Scy UNLOCK_GROUP(g); 252275970Scy CLAMPTO(share); 253275970Scy } 254275970Scy 255275970Scy if (max_so_far < 0) 256275970Scy max_so_far = 0; 257275970Scy return max_so_far; 258275970Scy} 259275970Scy 260275970Scyev_ssize_t 261275970Scybufferevent_get_read_max_(struct bufferevent_private *bev) 262275970Scy{ 263275970Scy return bufferevent_get_rlim_max_(bev, 0); 264275970Scy} 265275970Scy 266275970Scyev_ssize_t 267275970Scybufferevent_get_write_max_(struct bufferevent_private *bev) 268275970Scy{ 269275970Scy return bufferevent_get_rlim_max_(bev, 1); 270275970Scy} 271275970Scy 272275970Scyint 273275970Scybufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 274275970Scy{ 275275970Scy /* XXXXX Make sure all users of this function check its return value */ 276275970Scy int r = 0; 277275970Scy /* need to hold lock on bev */ 278275970Scy if (!bev->rate_limiting) 279275970Scy return 0; 280275970Scy 281275970Scy if (bev->rate_limiting->cfg) { 282275970Scy bev->rate_limiting->limit.read_limit -= bytes; 283275970Scy if (bev->rate_limiting->limit.read_limit <= 0) { 284275970Scy bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW); 285275970Scy if (event_add(&bev->rate_limiting->refill_bucket_event, 286275970Scy &bev->rate_limiting->cfg->tick_timeout) < 0) 287275970Scy r = -1; 288275970Scy } else if (bev->read_suspended & BEV_SUSPEND_BW) { 289275970Scy if (!(bev->write_suspended & BEV_SUSPEND_BW)) 290275970Scy event_del(&bev->rate_limiting->refill_bucket_event); 291275970Scy bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 292275970Scy } 293275970Scy } 294275970Scy 295275970Scy if (bev->rate_limiting->group) { 296275970Scy LOCK_GROUP(bev->rate_limiting->group); 297275970Scy bev->rate_limiting->group->rate_limit.read_limit -= bytes; 298275970Scy bev->rate_limiting->group->total_read += bytes; 299275970Scy if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { 300275970Scy bev_group_suspend_reading_(bev->rate_limiting->group); 301275970Scy } else if (bev->rate_limiting->group->read_suspended) { 302275970Scy bev_group_unsuspend_reading_(bev->rate_limiting->group); 303275970Scy } 304275970Scy UNLOCK_GROUP(bev->rate_limiting->group); 305275970Scy } 306275970Scy 307275970Scy return r; 308275970Scy} 309275970Scy 310275970Scyint 311275970Scybufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 312275970Scy{ 313275970Scy /* XXXXX Make sure all users of this function check its return value */ 314275970Scy int r = 0; 315275970Scy /* need to hold lock */ 316275970Scy if (!bev->rate_limiting) 317275970Scy return 0; 318275970Scy 319275970Scy if (bev->rate_limiting->cfg) { 320275970Scy bev->rate_limiting->limit.write_limit -= bytes; 321275970Scy if (bev->rate_limiting->limit.write_limit <= 0) { 322275970Scy bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW); 323275970Scy if (event_add(&bev->rate_limiting->refill_bucket_event, 324275970Scy &bev->rate_limiting->cfg->tick_timeout) < 0) 325275970Scy r = -1; 326275970Scy } else if (bev->write_suspended & BEV_SUSPEND_BW) { 327275970Scy if (!(bev->read_suspended & BEV_SUSPEND_BW)) 328275970Scy event_del(&bev->rate_limiting->refill_bucket_event); 329275970Scy bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 330275970Scy } 331275970Scy } 332275970Scy 333275970Scy if (bev->rate_limiting->group) { 334275970Scy LOCK_GROUP(bev->rate_limiting->group); 335275970Scy bev->rate_limiting->group->rate_limit.write_limit -= bytes; 336275970Scy bev->rate_limiting->group->total_written += bytes; 337275970Scy if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { 338275970Scy bev_group_suspend_writing_(bev->rate_limiting->group); 339275970Scy } else if (bev->rate_limiting->group->write_suspended) { 340275970Scy bev_group_unsuspend_writing_(bev->rate_limiting->group); 341275970Scy } 342275970Scy UNLOCK_GROUP(bev->rate_limiting->group); 343275970Scy } 344275970Scy 345275970Scy return r; 346275970Scy} 347275970Scy 348275970Scy/** Stop reading on every bufferevent in <b>g</b> */ 349275970Scystatic int 350275970Scybev_group_suspend_reading_(struct bufferevent_rate_limit_group *g) 351275970Scy{ 352275970Scy /* Needs group lock */ 353275970Scy struct bufferevent_private *bev; 354275970Scy g->read_suspended = 1; 355275970Scy g->pending_unsuspend_read = 0; 356275970Scy 357275970Scy /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK, 358275970Scy to prevent a deadlock. (Ordinarily, the group lock nests inside 359275970Scy the bufferevent locks. If we are unable to lock any individual 360275970Scy bufferevent, it will find out later when it looks at its limit 361275970Scy and sees that its group is suspended.) 362275970Scy */ 363275970Scy LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 364275970Scy if (EVLOCK_TRY_LOCK_(bev->lock)) { 365275970Scy bufferevent_suspend_read_(&bev->bev, 366275970Scy BEV_SUSPEND_BW_GROUP); 367275970Scy EVLOCK_UNLOCK(bev->lock, 0); 368275970Scy } 369275970Scy } 370275970Scy return 0; 371275970Scy} 372275970Scy 373275970Scy/** Stop writing on every bufferevent in <b>g</b> */ 374275970Scystatic int 375275970Scybev_group_suspend_writing_(struct bufferevent_rate_limit_group *g) 376275970Scy{ 377275970Scy /* Needs group lock */ 378275970Scy struct bufferevent_private *bev; 379275970Scy g->write_suspended = 1; 380275970Scy g->pending_unsuspend_write = 0; 381275970Scy LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 382275970Scy if (EVLOCK_TRY_LOCK_(bev->lock)) { 383275970Scy bufferevent_suspend_write_(&bev->bev, 384275970Scy BEV_SUSPEND_BW_GROUP); 385275970Scy EVLOCK_UNLOCK(bev->lock, 0); 386275970Scy } 387275970Scy } 388275970Scy return 0; 389275970Scy} 390275970Scy 391275970Scy/** Timer callback invoked on a single bufferevent with one or more exhausted 392275970Scy buckets when they are ready to refill. */ 393275970Scystatic void 394275970Scybev_refill_callback_(evutil_socket_t fd, short what, void *arg) 395275970Scy{ 396275970Scy unsigned tick; 397275970Scy struct timeval now; 398275970Scy struct bufferevent_private *bev = arg; 399275970Scy int again = 0; 400275970Scy BEV_LOCK(&bev->bev); 401275970Scy if (!bev->rate_limiting || !bev->rate_limiting->cfg) { 402275970Scy BEV_UNLOCK(&bev->bev); 403275970Scy return; 404275970Scy } 405275970Scy 406275970Scy /* First, update the bucket */ 407275970Scy event_base_gettimeofday_cached(bev->bev.ev_base, &now); 408275970Scy tick = ev_token_bucket_get_tick_(&now, 409275970Scy bev->rate_limiting->cfg); 410275970Scy ev_token_bucket_update_(&bev->rate_limiting->limit, 411275970Scy bev->rate_limiting->cfg, 412275970Scy tick); 413275970Scy 414275970Scy /* Now unsuspend any read/write operations as appropriate. */ 415275970Scy if ((bev->read_suspended & BEV_SUSPEND_BW)) { 416275970Scy if (bev->rate_limiting->limit.read_limit > 0) 417275970Scy bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 418275970Scy else 419275970Scy again = 1; 420275970Scy } 421275970Scy if ((bev->write_suspended & BEV_SUSPEND_BW)) { 422275970Scy if (bev->rate_limiting->limit.write_limit > 0) 423275970Scy bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 424275970Scy else 425275970Scy again = 1; 426275970Scy } 427275970Scy if (again) { 428275970Scy /* One or more of the buckets may need another refill if they 429275970Scy started negative. 430275970Scy 431275970Scy XXXX if we need to be quiet for more ticks, we should 432275970Scy maybe figure out what timeout we really want. 433275970Scy */ 434275970Scy /* XXXX Handle event_add failure somehow */ 435275970Scy event_add(&bev->rate_limiting->refill_bucket_event, 436275970Scy &bev->rate_limiting->cfg->tick_timeout); 437275970Scy } 438275970Scy BEV_UNLOCK(&bev->bev); 439275970Scy} 440275970Scy 441275970Scy/** Helper: grab a random element from a bufferevent group. 442275970Scy * 443275970Scy * Requires that we hold the lock on the group. 444275970Scy */ 445275970Scystatic struct bufferevent_private * 446275970Scybev_group_random_element_(struct bufferevent_rate_limit_group *group) 447275970Scy{ 448275970Scy int which; 449275970Scy struct bufferevent_private *bev; 450275970Scy 451275970Scy /* requires group lock */ 452275970Scy 453275970Scy if (!group->n_members) 454275970Scy return NULL; 455275970Scy 456275970Scy EVUTIL_ASSERT(! LIST_EMPTY(&group->members)); 457275970Scy 458275970Scy which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members); 459275970Scy 460275970Scy bev = LIST_FIRST(&group->members); 461275970Scy while (which--) 462275970Scy bev = LIST_NEXT(bev, rate_limiting->next_in_group); 463275970Scy 464275970Scy return bev; 465275970Scy} 466275970Scy 467275970Scy/** Iterate over the elements of a rate-limiting group 'g' with a random 468275970Scy starting point, assigning each to the variable 'bev', and executing the 469275970Scy block 'block'. 470275970Scy 471275970Scy We do this in a half-baked effort to get fairness among group members. 472275970Scy XXX Round-robin or some kind of priority queue would be even more fair. 473275970Scy */ 474275970Scy#define FOREACH_RANDOM_ORDER(block) \ 475275970Scy do { \ 476275970Scy first = bev_group_random_element_(g); \ 477275970Scy for (bev = first; bev != LIST_END(&g->members); \ 478275970Scy bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 479275970Scy block ; \ 480275970Scy } \ 481275970Scy for (bev = LIST_FIRST(&g->members); bev && bev != first; \ 482275970Scy bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 483275970Scy block ; \ 484275970Scy } \ 485275970Scy } while (0) 486275970Scy 487275970Scystatic void 488275970Scybev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g) 489275970Scy{ 490275970Scy int again = 0; 491275970Scy struct bufferevent_private *bev, *first; 492275970Scy 493275970Scy g->read_suspended = 0; 494275970Scy FOREACH_RANDOM_ORDER({ 495275970Scy if (EVLOCK_TRY_LOCK_(bev->lock)) { 496275970Scy bufferevent_unsuspend_read_(&bev->bev, 497275970Scy BEV_SUSPEND_BW_GROUP); 498275970Scy EVLOCK_UNLOCK(bev->lock, 0); 499275970Scy } else { 500275970Scy again = 1; 501275970Scy } 502275970Scy }); 503275970Scy g->pending_unsuspend_read = again; 504275970Scy} 505275970Scy 506275970Scystatic void 507275970Scybev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g) 508275970Scy{ 509275970Scy int again = 0; 510275970Scy struct bufferevent_private *bev, *first; 511275970Scy g->write_suspended = 0; 512275970Scy 513275970Scy FOREACH_RANDOM_ORDER({ 514275970Scy if (EVLOCK_TRY_LOCK_(bev->lock)) { 515275970Scy bufferevent_unsuspend_write_(&bev->bev, 516275970Scy BEV_SUSPEND_BW_GROUP); 517275970Scy EVLOCK_UNLOCK(bev->lock, 0); 518275970Scy } else { 519275970Scy again = 1; 520275970Scy } 521275970Scy }); 522275970Scy g->pending_unsuspend_write = again; 523275970Scy} 524275970Scy 525275970Scy/** Callback invoked every tick to add more elements to the group bucket 526275970Scy and unsuspend group members as needed. 527275970Scy */ 528275970Scystatic void 529275970Scybev_group_refill_callback_(evutil_socket_t fd, short what, void *arg) 530275970Scy{ 531275970Scy struct bufferevent_rate_limit_group *g = arg; 532275970Scy unsigned tick; 533275970Scy struct timeval now; 534275970Scy 535275970Scy event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); 536275970Scy 537275970Scy LOCK_GROUP(g); 538275970Scy 539275970Scy tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg); 540275970Scy ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick); 541275970Scy 542275970Scy if (g->pending_unsuspend_read || 543275970Scy (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { 544275970Scy bev_group_unsuspend_reading_(g); 545275970Scy } 546275970Scy if (g->pending_unsuspend_write || 547275970Scy (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ 548275970Scy bev_group_unsuspend_writing_(g); 549275970Scy } 550275970Scy 551275970Scy /* XXXX Rather than waiting to the next tick to unsuspend stuff 552275970Scy * with pending_unsuspend_write/read, we should do it on the 553275970Scy * next iteration of the mainloop. 554275970Scy */ 555275970Scy 556275970Scy UNLOCK_GROUP(g); 557275970Scy} 558275970Scy 559275970Scyint 560275970Scybufferevent_set_rate_limit(struct bufferevent *bev, 561275970Scy struct ev_token_bucket_cfg *cfg) 562275970Scy{ 563275970Scy struct bufferevent_private *bevp = 564275970Scy EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 565275970Scy int r = -1; 566275970Scy struct bufferevent_rate_limit *rlim; 567275970Scy struct timeval now; 568275970Scy ev_uint32_t tick; 569275970Scy int reinit = 0, suspended = 0; 570275970Scy /* XXX reference-count cfg */ 571275970Scy 572275970Scy BEV_LOCK(bev); 573275970Scy 574275970Scy if (cfg == NULL) { 575275970Scy if (bevp->rate_limiting) { 576275970Scy rlim = bevp->rate_limiting; 577275970Scy rlim->cfg = NULL; 578275970Scy bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 579275970Scy bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 580275970Scy if (event_initialized(&rlim->refill_bucket_event)) 581275970Scy event_del(&rlim->refill_bucket_event); 582275970Scy } 583275970Scy r = 0; 584275970Scy goto done; 585275970Scy } 586275970Scy 587275970Scy event_base_gettimeofday_cached(bev->ev_base, &now); 588275970Scy tick = ev_token_bucket_get_tick_(&now, cfg); 589275970Scy 590275970Scy if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { 591275970Scy /* no-op */ 592275970Scy r = 0; 593275970Scy goto done; 594275970Scy } 595275970Scy if (bevp->rate_limiting == NULL) { 596275970Scy rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 597275970Scy if (!rlim) 598275970Scy goto done; 599275970Scy bevp->rate_limiting = rlim; 600275970Scy } else { 601275970Scy rlim = bevp->rate_limiting; 602275970Scy } 603275970Scy reinit = rlim->cfg != NULL; 604275970Scy 605275970Scy rlim->cfg = cfg; 606275970Scy ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit); 607275970Scy 608275970Scy if (reinit) { 609275970Scy EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); 610275970Scy event_del(&rlim->refill_bucket_event); 611275970Scy } 612275970Scy event_assign(&rlim->refill_bucket_event, bev->ev_base, 613275970Scy -1, EV_FINALIZE, bev_refill_callback_, bevp); 614275970Scy 615275970Scy if (rlim->limit.read_limit > 0) { 616275970Scy bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 617275970Scy } else { 618275970Scy bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 619275970Scy suspended=1; 620275970Scy } 621275970Scy if (rlim->limit.write_limit > 0) { 622275970Scy bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 623275970Scy } else { 624275970Scy bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 625275970Scy suspended = 1; 626275970Scy } 627275970Scy 628275970Scy if (suspended) 629275970Scy event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); 630275970Scy 631275970Scy r = 0; 632275970Scy 633275970Scydone: 634275970Scy BEV_UNLOCK(bev); 635275970Scy return r; 636275970Scy} 637275970Scy 638275970Scystruct bufferevent_rate_limit_group * 639275970Scybufferevent_rate_limit_group_new(struct event_base *base, 640275970Scy const struct ev_token_bucket_cfg *cfg) 641275970Scy{ 642275970Scy struct bufferevent_rate_limit_group *g; 643275970Scy struct timeval now; 644275970Scy ev_uint32_t tick; 645275970Scy 646275970Scy event_base_gettimeofday_cached(base, &now); 647275970Scy tick = ev_token_bucket_get_tick_(&now, cfg); 648275970Scy 649275970Scy g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); 650275970Scy if (!g) 651275970Scy return NULL; 652275970Scy memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 653275970Scy LIST_INIT(&g->members); 654275970Scy 655275970Scy ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); 656275970Scy 657275970Scy event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, 658275970Scy bev_group_refill_callback_, g); 659275970Scy /*XXXX handle event_add failure */ 660275970Scy event_add(&g->master_refill_event, &cfg->tick_timeout); 661275970Scy 662275970Scy EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 663275970Scy 664275970Scy bufferevent_rate_limit_group_set_min_share(g, 64); 665275970Scy 666275970Scy evutil_weakrand_seed_(&g->weakrand_seed, 667275970Scy (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g)); 668275970Scy 669275970Scy return g; 670275970Scy} 671275970Scy 672275970Scyint 673275970Scybufferevent_rate_limit_group_set_cfg( 674275970Scy struct bufferevent_rate_limit_group *g, 675275970Scy const struct ev_token_bucket_cfg *cfg) 676275970Scy{ 677275970Scy int same_tick; 678275970Scy if (!g || !cfg) 679275970Scy return -1; 680275970Scy 681275970Scy LOCK_GROUP(g); 682275970Scy same_tick = evutil_timercmp( 683275970Scy &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); 684275970Scy memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 685275970Scy 686275970Scy if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) 687275970Scy g->rate_limit.read_limit = cfg->read_maximum; 688275970Scy if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) 689275970Scy g->rate_limit.write_limit = cfg->write_maximum; 690275970Scy 691275970Scy if (!same_tick) { 692275970Scy /* This can cause a hiccup in the schedule */ 693275970Scy event_add(&g->master_refill_event, &cfg->tick_timeout); 694275970Scy } 695275970Scy 696275970Scy /* The new limits might force us to adjust min_share differently. */ 697275970Scy bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); 698275970Scy 699275970Scy UNLOCK_GROUP(g); 700275970Scy return 0; 701275970Scy} 702275970Scy 703275970Scyint 704275970Scybufferevent_rate_limit_group_set_min_share( 705275970Scy struct bufferevent_rate_limit_group *g, 706275970Scy size_t share) 707275970Scy{ 708275970Scy if (share > EV_SSIZE_MAX) 709275970Scy return -1; 710275970Scy 711275970Scy g->configured_min_share = share; 712275970Scy 713275970Scy /* Can't set share to less than the one-tick maximum. IOW, at steady 714275970Scy * state, at least one connection can go per tick. */ 715275970Scy if (share > g->rate_limit_cfg.read_rate) 716275970Scy share = g->rate_limit_cfg.read_rate; 717275970Scy if (share > g->rate_limit_cfg.write_rate) 718275970Scy share = g->rate_limit_cfg.write_rate; 719275970Scy 720275970Scy g->min_share = share; 721275970Scy return 0; 722275970Scy} 723275970Scy 724275970Scyvoid 725275970Scybufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) 726275970Scy{ 727275970Scy LOCK_GROUP(g); 728275970Scy EVUTIL_ASSERT(0 == g->n_members); 729275970Scy event_del(&g->master_refill_event); 730275970Scy UNLOCK_GROUP(g); 731275970Scy EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 732275970Scy mm_free(g); 733275970Scy} 734275970Scy 735275970Scyint 736275970Scybufferevent_add_to_rate_limit_group(struct bufferevent *bev, 737275970Scy struct bufferevent_rate_limit_group *g) 738275970Scy{ 739275970Scy int wsuspend, rsuspend; 740275970Scy struct bufferevent_private *bevp = 741275970Scy EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 742275970Scy BEV_LOCK(bev); 743275970Scy 744275970Scy if (!bevp->rate_limiting) { 745275970Scy struct bufferevent_rate_limit *rlim; 746275970Scy rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 747275970Scy if (!rlim) { 748275970Scy BEV_UNLOCK(bev); 749275970Scy return -1; 750275970Scy } 751275970Scy event_assign(&rlim->refill_bucket_event, bev->ev_base, 752275970Scy -1, EV_FINALIZE, bev_refill_callback_, bevp); 753275970Scy bevp->rate_limiting = rlim; 754275970Scy } 755275970Scy 756275970Scy if (bevp->rate_limiting->group == g) { 757275970Scy BEV_UNLOCK(bev); 758275970Scy return 0; 759275970Scy } 760275970Scy if (bevp->rate_limiting->group) 761275970Scy bufferevent_remove_from_rate_limit_group(bev); 762275970Scy 763275970Scy LOCK_GROUP(g); 764275970Scy bevp->rate_limiting->group = g; 765275970Scy ++g->n_members; 766275970Scy LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group); 767275970Scy 768275970Scy rsuspend = g->read_suspended; 769275970Scy wsuspend = g->write_suspended; 770275970Scy 771275970Scy UNLOCK_GROUP(g); 772275970Scy 773275970Scy if (rsuspend) 774275970Scy bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP); 775275970Scy if (wsuspend) 776275970Scy bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP); 777275970Scy 778275970Scy BEV_UNLOCK(bev); 779275970Scy return 0; 780275970Scy} 781275970Scy 782275970Scyint 783275970Scybufferevent_remove_from_rate_limit_group(struct bufferevent *bev) 784275970Scy{ 785275970Scy return bufferevent_remove_from_rate_limit_group_internal_(bev, 1); 786275970Scy} 787275970Scy 788275970Scyint 789275970Scybufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev, 790275970Scy int unsuspend) 791275970Scy{ 792275970Scy struct bufferevent_private *bevp = 793275970Scy EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 794275970Scy BEV_LOCK(bev); 795275970Scy if (bevp->rate_limiting && bevp->rate_limiting->group) { 796275970Scy struct bufferevent_rate_limit_group *g = 797275970Scy bevp->rate_limiting->group; 798275970Scy LOCK_GROUP(g); 799275970Scy bevp->rate_limiting->group = NULL; 800275970Scy --g->n_members; 801275970Scy LIST_REMOVE(bevp, rate_limiting->next_in_group); 802275970Scy UNLOCK_GROUP(g); 803275970Scy } 804275970Scy if (unsuspend) { 805275970Scy bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP); 806275970Scy bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP); 807275970Scy } 808275970Scy BEV_UNLOCK(bev); 809275970Scy return 0; 810275970Scy} 811275970Scy 812275970Scy/* === 813275970Scy * API functions to expose rate limits. 814275970Scy * 815275970Scy * Don't use these from inside Libevent; they're meant to be for use by 816275970Scy * the program. 817275970Scy * === */ 818275970Scy 819275970Scy/* Mostly you don't want to use this function from inside libevent; 820275970Scy * bufferevent_get_read_max_() is more likely what you want*/ 821275970Scyev_ssize_t 822275970Scybufferevent_get_read_limit(struct bufferevent *bev) 823275970Scy{ 824275970Scy ev_ssize_t r; 825275970Scy struct bufferevent_private *bevp; 826275970Scy BEV_LOCK(bev); 827275970Scy bevp = BEV_UPCAST(bev); 828275970Scy if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 829275970Scy bufferevent_update_buckets(bevp); 830275970Scy r = bevp->rate_limiting->limit.read_limit; 831275970Scy } else { 832275970Scy r = EV_SSIZE_MAX; 833275970Scy } 834275970Scy BEV_UNLOCK(bev); 835275970Scy return r; 836275970Scy} 837275970Scy 838275970Scy/* Mostly you don't want to use this function from inside libevent; 839275970Scy * bufferevent_get_write_max_() is more likely what you want*/ 840275970Scyev_ssize_t 841275970Scybufferevent_get_write_limit(struct bufferevent *bev) 842275970Scy{ 843275970Scy ev_ssize_t r; 844275970Scy struct bufferevent_private *bevp; 845275970Scy BEV_LOCK(bev); 846275970Scy bevp = BEV_UPCAST(bev); 847275970Scy if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 848275970Scy bufferevent_update_buckets(bevp); 849275970Scy r = bevp->rate_limiting->limit.write_limit; 850275970Scy } else { 851275970Scy r = EV_SSIZE_MAX; 852275970Scy } 853275970Scy BEV_UNLOCK(bev); 854275970Scy return r; 855275970Scy} 856275970Scy 857275970Scyint 858275970Scybufferevent_set_max_single_read(struct bufferevent *bev, size_t size) 859275970Scy{ 860275970Scy struct bufferevent_private *bevp; 861275970Scy BEV_LOCK(bev); 862275970Scy bevp = BEV_UPCAST(bev); 863275970Scy if (size == 0 || size > EV_SSIZE_MAX) 864275970Scy bevp->max_single_read = MAX_SINGLE_READ_DEFAULT; 865275970Scy else 866275970Scy bevp->max_single_read = size; 867275970Scy BEV_UNLOCK(bev); 868275970Scy return 0; 869275970Scy} 870275970Scy 871275970Scyint 872275970Scybufferevent_set_max_single_write(struct bufferevent *bev, size_t size) 873275970Scy{ 874275970Scy struct bufferevent_private *bevp; 875275970Scy BEV_LOCK(bev); 876275970Scy bevp = BEV_UPCAST(bev); 877275970Scy if (size == 0 || size > EV_SSIZE_MAX) 878275970Scy bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 879275970Scy else 880275970Scy bevp->max_single_write = size; 881275970Scy BEV_UNLOCK(bev); 882275970Scy return 0; 883275970Scy} 884275970Scy 885275970Scyev_ssize_t 886275970Scybufferevent_get_max_single_read(struct bufferevent *bev) 887275970Scy{ 888275970Scy ev_ssize_t r; 889275970Scy 890275970Scy BEV_LOCK(bev); 891275970Scy r = BEV_UPCAST(bev)->max_single_read; 892275970Scy BEV_UNLOCK(bev); 893275970Scy return r; 894275970Scy} 895275970Scy 896275970Scyev_ssize_t 897275970Scybufferevent_get_max_single_write(struct bufferevent *bev) 898275970Scy{ 899275970Scy ev_ssize_t r; 900275970Scy 901275970Scy BEV_LOCK(bev); 902275970Scy r = BEV_UPCAST(bev)->max_single_write; 903275970Scy BEV_UNLOCK(bev); 904275970Scy return r; 905275970Scy} 906275970Scy 907275970Scyev_ssize_t 908275970Scybufferevent_get_max_to_read(struct bufferevent *bev) 909275970Scy{ 910275970Scy ev_ssize_t r; 911275970Scy BEV_LOCK(bev); 912275970Scy r = bufferevent_get_read_max_(BEV_UPCAST(bev)); 913275970Scy BEV_UNLOCK(bev); 914275970Scy return r; 915275970Scy} 916275970Scy 917275970Scyev_ssize_t 918275970Scybufferevent_get_max_to_write(struct bufferevent *bev) 919275970Scy{ 920275970Scy ev_ssize_t r; 921275970Scy BEV_LOCK(bev); 922275970Scy r = bufferevent_get_write_max_(BEV_UPCAST(bev)); 923275970Scy BEV_UNLOCK(bev); 924275970Scy return r; 925275970Scy} 926275970Scy 927275970Scyconst struct ev_token_bucket_cfg * 928275970Scybufferevent_get_token_bucket_cfg(const struct bufferevent *bev) { 929275970Scy struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 930275970Scy struct ev_token_bucket_cfg *cfg; 931275970Scy 932275970Scy BEV_LOCK(bev); 933275970Scy 934275970Scy if (bufev_private->rate_limiting) { 935275970Scy cfg = bufev_private->rate_limiting->cfg; 936275970Scy } else { 937275970Scy cfg = NULL; 938275970Scy } 939275970Scy 940275970Scy BEV_UNLOCK(bev); 941275970Scy 942275970Scy return cfg; 943275970Scy} 944275970Scy 945275970Scy/* Mostly you don't want to use this function from inside libevent; 946275970Scy * bufferevent_get_read_max_() is more likely what you want*/ 947275970Scyev_ssize_t 948275970Scybufferevent_rate_limit_group_get_read_limit( 949275970Scy struct bufferevent_rate_limit_group *grp) 950275970Scy{ 951275970Scy ev_ssize_t r; 952275970Scy LOCK_GROUP(grp); 953275970Scy r = grp->rate_limit.read_limit; 954275970Scy UNLOCK_GROUP(grp); 955275970Scy return r; 956275970Scy} 957275970Scy 958275970Scy/* Mostly you don't want to use this function from inside libevent; 959275970Scy * bufferevent_get_write_max_() is more likely what you want. */ 960275970Scyev_ssize_t 961275970Scybufferevent_rate_limit_group_get_write_limit( 962275970Scy struct bufferevent_rate_limit_group *grp) 963275970Scy{ 964275970Scy ev_ssize_t r; 965275970Scy LOCK_GROUP(grp); 966275970Scy r = grp->rate_limit.write_limit; 967275970Scy UNLOCK_GROUP(grp); 968275970Scy return r; 969275970Scy} 970275970Scy 971275970Scyint 972275970Scybufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) 973275970Scy{ 974275970Scy int r = 0; 975275970Scy ev_ssize_t old_limit, new_limit; 976275970Scy struct bufferevent_private *bevp; 977275970Scy BEV_LOCK(bev); 978275970Scy bevp = BEV_UPCAST(bev); 979275970Scy EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 980275970Scy old_limit = bevp->rate_limiting->limit.read_limit; 981275970Scy 982275970Scy new_limit = (bevp->rate_limiting->limit.read_limit -= decr); 983275970Scy if (old_limit > 0 && new_limit <= 0) { 984275970Scy bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 985275970Scy if (event_add(&bevp->rate_limiting->refill_bucket_event, 986275970Scy &bevp->rate_limiting->cfg->tick_timeout) < 0) 987275970Scy r = -1; 988275970Scy } else if (old_limit <= 0 && new_limit > 0) { 989275970Scy if (!(bevp->write_suspended & BEV_SUSPEND_BW)) 990275970Scy event_del(&bevp->rate_limiting->refill_bucket_event); 991275970Scy bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 992275970Scy } 993275970Scy 994275970Scy BEV_UNLOCK(bev); 995275970Scy return r; 996275970Scy} 997275970Scy 998275970Scyint 999275970Scybufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) 1000275970Scy{ 1001275970Scy /* XXXX this is mostly copy-and-paste from 1002275970Scy * bufferevent_decrement_read_limit */ 1003275970Scy int r = 0; 1004275970Scy ev_ssize_t old_limit, new_limit; 1005275970Scy struct bufferevent_private *bevp; 1006275970Scy BEV_LOCK(bev); 1007275970Scy bevp = BEV_UPCAST(bev); 1008275970Scy EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 1009275970Scy old_limit = bevp->rate_limiting->limit.write_limit; 1010275970Scy 1011275970Scy new_limit = (bevp->rate_limiting->limit.write_limit -= decr); 1012275970Scy if (old_limit > 0 && new_limit <= 0) { 1013275970Scy bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 1014275970Scy if (event_add(&bevp->rate_limiting->refill_bucket_event, 1015275970Scy &bevp->rate_limiting->cfg->tick_timeout) < 0) 1016275970Scy r = -1; 1017275970Scy } else if (old_limit <= 0 && new_limit > 0) { 1018275970Scy if (!(bevp->read_suspended & BEV_SUSPEND_BW)) 1019275970Scy event_del(&bevp->rate_limiting->refill_bucket_event); 1020275970Scy bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 1021275970Scy } 1022275970Scy 1023275970Scy BEV_UNLOCK(bev); 1024275970Scy return r; 1025275970Scy} 1026275970Scy 1027275970Scyint 1028275970Scybufferevent_rate_limit_group_decrement_read( 1029275970Scy struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1030275970Scy{ 1031275970Scy int r = 0; 1032275970Scy ev_ssize_t old_limit, new_limit; 1033275970Scy LOCK_GROUP(grp); 1034275970Scy old_limit = grp->rate_limit.read_limit; 1035275970Scy new_limit = (grp->rate_limit.read_limit -= decr); 1036275970Scy 1037275970Scy if (old_limit > 0 && new_limit <= 0) { 1038275970Scy bev_group_suspend_reading_(grp); 1039275970Scy } else if (old_limit <= 0 && new_limit > 0) { 1040275970Scy bev_group_unsuspend_reading_(grp); 1041275970Scy } 1042275970Scy 1043275970Scy UNLOCK_GROUP(grp); 1044275970Scy return r; 1045275970Scy} 1046275970Scy 1047275970Scyint 1048275970Scybufferevent_rate_limit_group_decrement_write( 1049275970Scy struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1050275970Scy{ 1051275970Scy int r = 0; 1052275970Scy ev_ssize_t old_limit, new_limit; 1053275970Scy LOCK_GROUP(grp); 1054275970Scy old_limit = grp->rate_limit.write_limit; 1055275970Scy new_limit = (grp->rate_limit.write_limit -= decr); 1056275970Scy 1057275970Scy if (old_limit > 0 && new_limit <= 0) { 1058275970Scy bev_group_suspend_writing_(grp); 1059275970Scy } else if (old_limit <= 0 && new_limit > 0) { 1060275970Scy bev_group_unsuspend_writing_(grp); 1061275970Scy } 1062275970Scy 1063275970Scy UNLOCK_GROUP(grp); 1064275970Scy return r; 1065275970Scy} 1066275970Scy 1067275970Scyvoid 1068275970Scybufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, 1069275970Scy ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) 1070275970Scy{ 1071275970Scy EVUTIL_ASSERT(grp != NULL); 1072275970Scy if (total_read_out) 1073275970Scy *total_read_out = grp->total_read; 1074275970Scy if (total_written_out) 1075275970Scy *total_written_out = grp->total_written; 1076275970Scy} 1077275970Scy 1078275970Scyvoid 1079275970Scybufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) 1080275970Scy{ 1081275970Scy grp->total_read = grp->total_written = 0; 1082275970Scy} 1083275970Scy 1084275970Scyint 1085275970Scybufferevent_ratelim_init_(struct bufferevent_private *bev) 1086275970Scy{ 1087275970Scy bev->rate_limiting = NULL; 1088275970Scy bev->max_single_read = MAX_SINGLE_READ_DEFAULT; 1089275970Scy bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 1090275970Scy 1091275970Scy return 0; 1092275970Scy} 1093