1/* $NetBSD: bufferevent_ratelim.c,v 1.5 2020/05/25 20:47:33 christos Exp $ */ 2 3/* 4 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 5 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. The name of the author may not be used to endorse or promote products 17 * derived from this software without specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 21 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 22 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 24 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30#include "evconfig-private.h" 31 32#include <sys/types.h> 33#include <limits.h> 34#include <string.h> 35#include <stdlib.h> 36 37#include "event2/event.h" 38#include "event2/event_struct.h" 39#include "event2/util.h" 40#include "event2/bufferevent.h" 41#include "event2/bufferevent_struct.h" 42#include "event2/buffer.h" 43 44#include "ratelim-internal.h" 45 46#include "bufferevent-internal.h" 47#include "mm-internal.h" 48#include "util-internal.h" 49#include "event-internal.h" 50 51int 52ev_token_bucket_init_(struct ev_token_bucket *bucket, 53 const struct ev_token_bucket_cfg *cfg, 54 ev_uint32_t current_tick, 55 int reinitialize) 56{ 57 if (reinitialize) { 58 /* on reinitialization, we only clip downwards, since we've 59 already used who-knows-how-much bandwidth this tick. We 60 leave "last_updated" as it is; the next update will add the 61 appropriate amount of bandwidth to the bucket. 62 */ 63 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) 64 bucket->read_limit = cfg->read_maximum; 65 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) 66 bucket->write_limit = cfg->write_maximum; 67 } else { 68 bucket->read_limit = cfg->read_rate; 69 bucket->write_limit = cfg->write_rate; 70 bucket->last_updated = current_tick; 71 } 72 return 0; 73} 74 75int 76ev_token_bucket_update_(struct ev_token_bucket *bucket, 77 const struct ev_token_bucket_cfg *cfg, 78 ev_uint32_t current_tick) 79{ 80 /* It's okay if the tick number overflows, since we'll just 81 * wrap around when we do the unsigned substraction. */ 82 unsigned n_ticks = current_tick - bucket->last_updated; 83 84 /* Make sure some ticks actually happened, and that time didn't 85 * roll back. */ 86 if (n_ticks == 0 || n_ticks > INT_MAX) 87 return 0; 88 89 /* Naively, we would say 90 bucket->limit += n_ticks * cfg->rate; 91 92 if (bucket->limit > cfg->maximum) 93 bucket->limit = cfg->maximum; 94 95 But we're worried about overflow, so we do it like this: 96 */ 97 98 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) 99 bucket->read_limit = cfg->read_maximum; 100 else 101 bucket->read_limit += n_ticks * cfg->read_rate; 102 103 104 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) 105 bucket->write_limit = cfg->write_maximum; 106 else 107 bucket->write_limit += n_ticks * cfg->write_rate; 108 109 110 bucket->last_updated = current_tick; 111 112 return 1; 113} 114 115static inline void 116bufferevent_update_buckets(struct bufferevent_private *bev) 117{ 118 /* Must hold lock on bev. */ 119 struct timeval now; 120 unsigned tick; 121 event_base_gettimeofday_cached(bev->bev.ev_base, &now); 122 tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg); 123 if (tick != bev->rate_limiting->limit.last_updated) 124 ev_token_bucket_update_(&bev->rate_limiting->limit, 125 bev->rate_limiting->cfg, tick); 126} 127 128ev_uint32_t 129ev_token_bucket_get_tick_(const struct timeval *tv, 130 const struct ev_token_bucket_cfg *cfg) 131{ 132 /* This computation uses two multiplies and a divide. We could do 133 * fewer if we knew that the tick length was an integer number of 134 * seconds, or if we knew it divided evenly into a second. We should 135 * investigate that more. 136 */ 137 138 /* We cast to an ev_uint64_t first, since we don't want to overflow 139 * before we do the final divide. */ 140 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; 141 return (unsigned)(msec / cfg->msec_per_tick); 142} 143 144struct ev_token_bucket_cfg * 145ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, 146 size_t write_rate, size_t write_burst, 147 const struct timeval *tick_len) 148{ 149 struct ev_token_bucket_cfg *r; 150 struct timeval g; 151 if (! tick_len) { 152 g.tv_sec = 1; 153 g.tv_usec = 0; 154 tick_len = &g; 155 } 156 if (read_rate > read_burst || write_rate > write_burst || 157 read_rate < 1 || write_rate < 1) 158 return NULL; 159 if (read_rate > EV_RATE_LIMIT_MAX || 160 write_rate > EV_RATE_LIMIT_MAX || 161 read_burst > EV_RATE_LIMIT_MAX || 162 write_burst > EV_RATE_LIMIT_MAX) 163 return NULL; 164 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); 165 if (!r) 166 return NULL; 167 r->read_rate = read_rate; 168 r->write_rate = write_rate; 169 r->read_maximum = read_burst; 170 r->write_maximum = write_burst; 171 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); 172 r->msec_per_tick = (tick_len->tv_sec * 1000) + 173 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; 174 return r; 175} 176 177void 178ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) 179{ 180 mm_free(cfg); 181} 182 183/* Default values for max_single_read & max_single_write variables. */ 184#define MAX_SINGLE_READ_DEFAULT 16384 185#define MAX_SINGLE_WRITE_DEFAULT 16384 186 187#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) 188#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) 189 190static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g); 191static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g); 192static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g); 193static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g); 194 195/** Helper: figure out the maximum amount we should write if is_write, or 196 the maximum amount we should read if is_read. Return that maximum, or 197 0 if our bucket is wholly exhausted. 198 */ 199static inline ev_ssize_t 200bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write) 201{ 202 /* needs lock on bev. */ 203 ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read; 204 205#define LIM(x) \ 206 (is_write ? (x).write_limit : (x).read_limit) 207 208#define GROUP_SUSPENDED(g) \ 209 (is_write ? (g)->write_suspended : (g)->read_suspended) 210 211 /* Sets max_so_far to MIN(x, max_so_far) */ 212#define CLAMPTO(x) \ 213 do { \ 214 if (max_so_far > (x)) \ 215 max_so_far = (x); \ 216 } while (0); 217 218 if (!bev->rate_limiting) 219 return max_so_far; 220 221 /* If rate-limiting is enabled at all, update the appropriate 222 bucket, and take the smaller of our rate limit and the group 223 rate limit. 224 */ 225 226 if (bev->rate_limiting->cfg) { 227 bufferevent_update_buckets(bev); 228 max_so_far = LIM(bev->rate_limiting->limit); 229 } 230 if (bev->rate_limiting->group) { 231 struct bufferevent_rate_limit_group *g = 232 bev->rate_limiting->group; 233 ev_ssize_t share; 234 LOCK_GROUP(g); 235 if (GROUP_SUSPENDED(g)) { 236 /* We can get here if we failed to lock this 237 * particular bufferevent while suspending the whole 238 * group. */ 239 if (is_write) 240 bufferevent_suspend_write_(&bev->bev, 241 BEV_SUSPEND_BW_GROUP); 242 else 243 bufferevent_suspend_read_(&bev->bev, 244 BEV_SUSPEND_BW_GROUP); 245 share = 0; 246 } else { 247 /* XXXX probably we should divide among the active 248 * members, not the total members. */ 249 share = LIM(g->rate_limit) / g->n_members; 250 if (share < g->min_share) 251 share = g->min_share; 252 } 253 UNLOCK_GROUP(g); 254 CLAMPTO(share); 255 } 256 257 if (max_so_far < 0) 258 max_so_far = 0; 259 return max_so_far; 260} 261 262ev_ssize_t 263bufferevent_get_read_max_(struct bufferevent_private *bev) 264{ 265 return bufferevent_get_rlim_max_(bev, 0); 266} 267 268ev_ssize_t 269bufferevent_get_write_max_(struct bufferevent_private *bev) 270{ 271 return bufferevent_get_rlim_max_(bev, 1); 272} 273 274int 275bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 276{ 277 /* XXXXX Make sure all users of this function check its return value */ 278 int r = 0; 279 /* need to hold lock on bev */ 280 if (!bev->rate_limiting) 281 return 0; 282 283 if (bev->rate_limiting->cfg) { 284 bev->rate_limiting->limit.read_limit -= bytes; 285 if (bev->rate_limiting->limit.read_limit <= 0) { 286 bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW); 287 if (event_add(&bev->rate_limiting->refill_bucket_event, 288 &bev->rate_limiting->cfg->tick_timeout) < 0) 289 r = -1; 290 } else if (bev->read_suspended & BEV_SUSPEND_BW) { 291 if (!(bev->write_suspended & BEV_SUSPEND_BW)) 292 event_del(&bev->rate_limiting->refill_bucket_event); 293 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 294 } 295 } 296 297 if (bev->rate_limiting->group) { 298 LOCK_GROUP(bev->rate_limiting->group); 299 bev->rate_limiting->group->rate_limit.read_limit -= bytes; 300 bev->rate_limiting->group->total_read += bytes; 301 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { 302 bev_group_suspend_reading_(bev->rate_limiting->group); 303 } else if (bev->rate_limiting->group->read_suspended) { 304 bev_group_unsuspend_reading_(bev->rate_limiting->group); 305 } 306 UNLOCK_GROUP(bev->rate_limiting->group); 307 } 308 309 return r; 310} 311 312int 313bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) 314{ 315 /* XXXXX Make sure all users of this function check its return value */ 316 int r = 0; 317 /* need to hold lock */ 318 if (!bev->rate_limiting) 319 return 0; 320 321 if (bev->rate_limiting->cfg) { 322 bev->rate_limiting->limit.write_limit -= bytes; 323 if (bev->rate_limiting->limit.write_limit <= 0) { 324 bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW); 325 if (event_add(&bev->rate_limiting->refill_bucket_event, 326 &bev->rate_limiting->cfg->tick_timeout) < 0) 327 r = -1; 328 } else if (bev->write_suspended & BEV_SUSPEND_BW) { 329 if (!(bev->read_suspended & BEV_SUSPEND_BW)) 330 event_del(&bev->rate_limiting->refill_bucket_event); 331 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 332 } 333 } 334 335 if (bev->rate_limiting->group) { 336 LOCK_GROUP(bev->rate_limiting->group); 337 bev->rate_limiting->group->rate_limit.write_limit -= bytes; 338 bev->rate_limiting->group->total_written += bytes; 339 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { 340 bev_group_suspend_writing_(bev->rate_limiting->group); 341 } else if (bev->rate_limiting->group->write_suspended) { 342 bev_group_unsuspend_writing_(bev->rate_limiting->group); 343 } 344 UNLOCK_GROUP(bev->rate_limiting->group); 345 } 346 347 return r; 348} 349 350/** Stop reading on every bufferevent in <b>g</b> */ 351static int 352bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g) 353{ 354 /* Needs group lock */ 355 struct bufferevent_private *bev; 356 g->read_suspended = 1; 357 g->pending_unsuspend_read = 0; 358 359 /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK, 360 to prevent a deadlock. (Ordinarily, the group lock nests inside 361 the bufferevent locks. If we are unable to lock any individual 362 bufferevent, it will find out later when it looks at its limit 363 and sees that its group is suspended.) 364 */ 365 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 366 if (EVLOCK_TRY_LOCK_(bev->lock)) { 367 bufferevent_suspend_read_(&bev->bev, 368 BEV_SUSPEND_BW_GROUP); 369 EVLOCK_UNLOCK(bev->lock, 0); 370 } 371 } 372 return 0; 373} 374 375/** Stop writing on every bufferevent in <b>g</b> */ 376static int 377bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g) 378{ 379 /* Needs group lock */ 380 struct bufferevent_private *bev; 381 g->write_suspended = 1; 382 g->pending_unsuspend_write = 0; 383 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 384 if (EVLOCK_TRY_LOCK_(bev->lock)) { 385 bufferevent_suspend_write_(&bev->bev, 386 BEV_SUSPEND_BW_GROUP); 387 EVLOCK_UNLOCK(bev->lock, 0); 388 } 389 } 390 return 0; 391} 392 393/** Timer callback invoked on a single bufferevent with one or more exhausted 394 buckets when they are ready to refill. */ 395static void 396bev_refill_callback_(evutil_socket_t fd, short what, void *arg) 397{ 398 unsigned tick; 399 struct timeval now; 400 struct bufferevent_private *bev = arg; 401 int again = 0; 402 BEV_LOCK(&bev->bev); 403 if (!bev->rate_limiting || !bev->rate_limiting->cfg) { 404 BEV_UNLOCK(&bev->bev); 405 return; 406 } 407 408 /* First, update the bucket */ 409 event_base_gettimeofday_cached(bev->bev.ev_base, &now); 410 tick = ev_token_bucket_get_tick_(&now, 411 bev->rate_limiting->cfg); 412 ev_token_bucket_update_(&bev->rate_limiting->limit, 413 bev->rate_limiting->cfg, 414 tick); 415 416 /* Now unsuspend any read/write operations as appropriate. */ 417 if ((bev->read_suspended & BEV_SUSPEND_BW)) { 418 if (bev->rate_limiting->limit.read_limit > 0) 419 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); 420 else 421 again = 1; 422 } 423 if ((bev->write_suspended & BEV_SUSPEND_BW)) { 424 if (bev->rate_limiting->limit.write_limit > 0) 425 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); 426 else 427 again = 1; 428 } 429 if (again) { 430 /* One or more of the buckets may need another refill if they 431 started negative. 432 433 XXXX if we need to be quiet for more ticks, we should 434 maybe figure out what timeout we really want. 435 */ 436 /* XXXX Handle event_add failure somehow */ 437 event_add(&bev->rate_limiting->refill_bucket_event, 438 &bev->rate_limiting->cfg->tick_timeout); 439 } 440 BEV_UNLOCK(&bev->bev); 441} 442 443/** Helper: grab a random element from a bufferevent group. 444 * 445 * Requires that we hold the lock on the group. 446 */ 447static struct bufferevent_private * 448bev_group_random_element_(struct bufferevent_rate_limit_group *group) 449{ 450 int which; 451 struct bufferevent_private *bev; 452 453 /* requires group lock */ 454 455 if (!group->n_members) 456 return NULL; 457 458 EVUTIL_ASSERT(! LIST_EMPTY(&group->members)); 459 460 which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members); 461 462 bev = LIST_FIRST(&group->members); 463 while (which--) 464 bev = LIST_NEXT(bev, rate_limiting->next_in_group); 465 466 return bev; 467} 468 469/** Iterate over the elements of a rate-limiting group 'g' with a random 470 starting point, assigning each to the variable 'bev', and executing the 471 block 'block'. 472 473 We do this in a half-baked effort to get fairness among group members. 474 XXX Round-robin or some kind of priority queue would be even more fair. 475 */ 476#define FOREACH_RANDOM_ORDER(block) \ 477 do { \ 478 first = bev_group_random_element_(g); \ 479 for (bev = first; bev != LIST_END(&g->members); \ 480 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 481 block ; \ 482 } \ 483 for (bev = LIST_FIRST(&g->members); bev && bev != first; \ 484 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ 485 block ; \ 486 } \ 487 } while (0) 488 489static void 490bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g) 491{ 492 int again = 0; 493 struct bufferevent_private *bev, *first; 494 495 g->read_suspended = 0; 496 FOREACH_RANDOM_ORDER({ 497 if (EVLOCK_TRY_LOCK_(bev->lock)) { 498 bufferevent_unsuspend_read_(&bev->bev, 499 BEV_SUSPEND_BW_GROUP); 500 EVLOCK_UNLOCK(bev->lock, 0); 501 } else { 502 again = 1; 503 } 504 }); 505 g->pending_unsuspend_read = again; 506} 507 508static void 509bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g) 510{ 511 int again = 0; 512 struct bufferevent_private *bev, *first; 513 g->write_suspended = 0; 514 515 FOREACH_RANDOM_ORDER({ 516 if (EVLOCK_TRY_LOCK_(bev->lock)) { 517 bufferevent_unsuspend_write_(&bev->bev, 518 BEV_SUSPEND_BW_GROUP); 519 EVLOCK_UNLOCK(bev->lock, 0); 520 } else { 521 again = 1; 522 } 523 }); 524 g->pending_unsuspend_write = again; 525} 526 527/** Callback invoked every tick to add more elements to the group bucket 528 and unsuspend group members as needed. 529 */ 530static void 531bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg) 532{ 533 struct bufferevent_rate_limit_group *g = arg; 534 unsigned tick; 535 struct timeval now; 536 537 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); 538 539 LOCK_GROUP(g); 540 541 tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg); 542 ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick); 543 544 if (g->pending_unsuspend_read || 545 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { 546 bev_group_unsuspend_reading_(g); 547 } 548 if (g->pending_unsuspend_write || 549 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ 550 bev_group_unsuspend_writing_(g); 551 } 552 553 /* XXXX Rather than waiting to the next tick to unsuspend stuff 554 * with pending_unsuspend_write/read, we should do it on the 555 * next iteration of the mainloop. 556 */ 557 558 UNLOCK_GROUP(g); 559} 560 561int 562bufferevent_set_rate_limit(struct bufferevent *bev, 563 struct ev_token_bucket_cfg *cfg) 564{ 565 struct bufferevent_private *bevp = 566 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 567 int r = -1; 568 struct bufferevent_rate_limit *rlim; 569 struct timeval now; 570 ev_uint32_t tick; 571 int reinit = 0, suspended = 0; 572 /* XXX reference-count cfg */ 573 574 BEV_LOCK(bev); 575 576 if (cfg == NULL) { 577 if (bevp->rate_limiting) { 578 rlim = bevp->rate_limiting; 579 rlim->cfg = NULL; 580 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 581 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 582 if (event_initialized(&rlim->refill_bucket_event)) 583 event_del(&rlim->refill_bucket_event); 584 } 585 r = 0; 586 goto done; 587 } 588 589 event_base_gettimeofday_cached(bev->ev_base, &now); 590 tick = ev_token_bucket_get_tick_(&now, cfg); 591 592 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { 593 /* no-op */ 594 r = 0; 595 goto done; 596 } 597 if (bevp->rate_limiting == NULL) { 598 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 599 if (!rlim) 600 goto done; 601 bevp->rate_limiting = rlim; 602 } else { 603 rlim = bevp->rate_limiting; 604 } 605 reinit = rlim->cfg != NULL; 606 607 rlim->cfg = cfg; 608 ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit); 609 610 if (reinit) { 611 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); 612 event_del(&rlim->refill_bucket_event); 613 } 614 event_assign(&rlim->refill_bucket_event, bev->ev_base, 615 -1, EV_FINALIZE, bev_refill_callback_, bevp); 616 617 if (rlim->limit.read_limit > 0) { 618 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 619 } else { 620 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 621 suspended=1; 622 } 623 if (rlim->limit.write_limit > 0) { 624 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 625 } else { 626 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 627 suspended = 1; 628 } 629 630 if (suspended) 631 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); 632 633 r = 0; 634 635done: 636 BEV_UNLOCK(bev); 637 return r; 638} 639 640struct bufferevent_rate_limit_group * 641bufferevent_rate_limit_group_new(struct event_base *base, 642 const struct ev_token_bucket_cfg *cfg) 643{ 644 struct bufferevent_rate_limit_group *g; 645 struct timeval now; 646 ev_uint32_t tick; 647 648 event_base_gettimeofday_cached(base, &now); 649 tick = ev_token_bucket_get_tick_(&now, cfg); 650 651 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); 652 if (!g) 653 return NULL; 654 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 655 LIST_INIT(&g->members); 656 657 ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); 658 659 event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, 660 bev_group_refill_callback_, g); 661 /*XXXX handle event_add failure */ 662 event_add(&g->master_refill_event, &cfg->tick_timeout); 663 664 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 665 666 bufferevent_rate_limit_group_set_min_share(g, 64); 667 668 evutil_weakrand_seed_(&g->weakrand_seed, 669 (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g)); 670 671 return g; 672} 673 674int 675bufferevent_rate_limit_group_set_cfg( 676 struct bufferevent_rate_limit_group *g, 677 const struct ev_token_bucket_cfg *cfg) 678{ 679 int same_tick; 680 if (!g || !cfg) 681 return -1; 682 683 LOCK_GROUP(g); 684 same_tick = evutil_timercmp( 685 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); 686 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 687 688 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) 689 g->rate_limit.read_limit = cfg->read_maximum; 690 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) 691 g->rate_limit.write_limit = cfg->write_maximum; 692 693 if (!same_tick) { 694 /* This can cause a hiccup in the schedule */ 695 event_add(&g->master_refill_event, &cfg->tick_timeout); 696 } 697 698 /* The new limits might force us to adjust min_share differently. */ 699 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); 700 701 UNLOCK_GROUP(g); 702 return 0; 703} 704 705int 706bufferevent_rate_limit_group_set_min_share( 707 struct bufferevent_rate_limit_group *g, 708 size_t share) 709{ 710 if (share > EV_SSIZE_MAX) 711 return -1; 712 713 g->configured_min_share = share; 714 715 /* Can't set share to less than the one-tick maximum. IOW, at steady 716 * state, at least one connection can go per tick. */ 717 if (share > g->rate_limit_cfg.read_rate) 718 share = g->rate_limit_cfg.read_rate; 719 if (share > g->rate_limit_cfg.write_rate) 720 share = g->rate_limit_cfg.write_rate; 721 722 g->min_share = share; 723 return 0; 724} 725 726void 727bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) 728{ 729 LOCK_GROUP(g); 730 EVUTIL_ASSERT(0 == g->n_members); 731 event_del(&g->master_refill_event); 732 UNLOCK_GROUP(g); 733 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 734 mm_free(g); 735} 736 737int 738bufferevent_add_to_rate_limit_group(struct bufferevent *bev, 739 struct bufferevent_rate_limit_group *g) 740{ 741 int wsuspend, rsuspend; 742 struct bufferevent_private *bevp = 743 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 744 BEV_LOCK(bev); 745 746 if (!bevp->rate_limiting) { 747 struct bufferevent_rate_limit *rlim; 748 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 749 if (!rlim) { 750 BEV_UNLOCK(bev); 751 return -1; 752 } 753 event_assign(&rlim->refill_bucket_event, bev->ev_base, 754 -1, EV_FINALIZE, bev_refill_callback_, bevp); 755 bevp->rate_limiting = rlim; 756 } 757 758 if (bevp->rate_limiting->group == g) { 759 BEV_UNLOCK(bev); 760 return 0; 761 } 762 if (bevp->rate_limiting->group) 763 bufferevent_remove_from_rate_limit_group(bev); 764 765 LOCK_GROUP(g); 766 bevp->rate_limiting->group = g; 767 ++g->n_members; 768 LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group); 769 770 rsuspend = g->read_suspended; 771 wsuspend = g->write_suspended; 772 773 UNLOCK_GROUP(g); 774 775 if (rsuspend) 776 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP); 777 if (wsuspend) 778 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP); 779 780 BEV_UNLOCK(bev); 781 return 0; 782} 783 784int 785bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) 786{ 787 return bufferevent_remove_from_rate_limit_group_internal_(bev, 1); 788} 789 790int 791bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev, 792 int unsuspend) 793{ 794 struct bufferevent_private *bevp = 795 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 796 BEV_LOCK(bev); 797 if (bevp->rate_limiting && bevp->rate_limiting->group) { 798 struct bufferevent_rate_limit_group *g = 799 bevp->rate_limiting->group; 800 LOCK_GROUP(g); 801 bevp->rate_limiting->group = NULL; 802 --g->n_members; 803 LIST_REMOVE(bevp, rate_limiting->next_in_group); 804 UNLOCK_GROUP(g); 805 } 806 if (unsuspend) { 807 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP); 808 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP); 809 } 810 BEV_UNLOCK(bev); 811 return 0; 812} 813 814/* === 815 * API functions to expose rate limits. 816 * 817 * Don't use these from inside Libevent; they're meant to be for use by 818 * the program. 819 * === */ 820 821/* Mostly you don't want to use this function from inside libevent; 822 * bufferevent_get_read_max_() is more likely what you want*/ 823ev_ssize_t 824bufferevent_get_read_limit(struct bufferevent *bev) 825{ 826 ev_ssize_t r; 827 struct bufferevent_private *bevp; 828 BEV_LOCK(bev); 829 bevp = BEV_UPCAST(bev); 830 if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 831 bufferevent_update_buckets(bevp); 832 r = bevp->rate_limiting->limit.read_limit; 833 } else { 834 r = EV_SSIZE_MAX; 835 } 836 BEV_UNLOCK(bev); 837 return r; 838} 839 840/* Mostly you don't want to use this function from inside libevent; 841 * bufferevent_get_write_max_() is more likely what you want*/ 842ev_ssize_t 843bufferevent_get_write_limit(struct bufferevent *bev) 844{ 845 ev_ssize_t r; 846 struct bufferevent_private *bevp; 847 BEV_LOCK(bev); 848 bevp = BEV_UPCAST(bev); 849 if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 850 bufferevent_update_buckets(bevp); 851 r = bevp->rate_limiting->limit.write_limit; 852 } else { 853 r = EV_SSIZE_MAX; 854 } 855 BEV_UNLOCK(bev); 856 return r; 857} 858 859int 860bufferevent_set_max_single_read(struct bufferevent *bev, size_t size) 861{ 862 struct bufferevent_private *bevp; 863 BEV_LOCK(bev); 864 bevp = BEV_UPCAST(bev); 865 if (size == 0 || size > EV_SSIZE_MAX) 866 bevp->max_single_read = MAX_SINGLE_READ_DEFAULT; 867 else 868 bevp->max_single_read = size; 869 BEV_UNLOCK(bev); 870 return 0; 871} 872 873int 874bufferevent_set_max_single_write(struct bufferevent *bev, size_t size) 875{ 876 struct bufferevent_private *bevp; 877 BEV_LOCK(bev); 878 bevp = BEV_UPCAST(bev); 879 if (size == 0 || size > EV_SSIZE_MAX) 880 bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 881 else 882 bevp->max_single_write = size; 883 BEV_UNLOCK(bev); 884 return 0; 885} 886 887ev_ssize_t 888bufferevent_get_max_single_read(struct bufferevent *bev) 889{ 890 ev_ssize_t r; 891 892 BEV_LOCK(bev); 893 r = BEV_UPCAST(bev)->max_single_read; 894 BEV_UNLOCK(bev); 895 return r; 896} 897 898ev_ssize_t 899bufferevent_get_max_single_write(struct bufferevent *bev) 900{ 901 ev_ssize_t r; 902 903 BEV_LOCK(bev); 904 r = BEV_UPCAST(bev)->max_single_write; 905 BEV_UNLOCK(bev); 906 return r; 907} 908 909ev_ssize_t 910bufferevent_get_max_to_read(struct bufferevent *bev) 911{ 912 ev_ssize_t r; 913 BEV_LOCK(bev); 914 r = bufferevent_get_read_max_(BEV_UPCAST(bev)); 915 BEV_UNLOCK(bev); 916 return r; 917} 918 919ev_ssize_t 920bufferevent_get_max_to_write(struct bufferevent *bev) 921{ 922 ev_ssize_t r; 923 BEV_LOCK(bev); 924 r = bufferevent_get_write_max_(BEV_UPCAST(bev)); 925 BEV_UNLOCK(bev); 926 return r; 927} 928 929const struct ev_token_bucket_cfg * 930bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) { 931 struct bufferevent_private *bufev_private = BEV_UPCAST(bev); 932 struct ev_token_bucket_cfg *cfg; 933 934 BEV_LOCK(bev); 935 936 if (bufev_private->rate_limiting) { 937 cfg = bufev_private->rate_limiting->cfg; 938 } else { 939 cfg = NULL; 940 } 941 942 BEV_UNLOCK(bev); 943 944 return cfg; 945} 946 947/* Mostly you don't want to use this function from inside libevent; 948 * bufferevent_get_read_max_() is more likely what you want*/ 949ev_ssize_t 950bufferevent_rate_limit_group_get_read_limit( 951 struct bufferevent_rate_limit_group *grp) 952{ 953 ev_ssize_t r; 954 LOCK_GROUP(grp); 955 r = grp->rate_limit.read_limit; 956 UNLOCK_GROUP(grp); 957 return r; 958} 959 960/* Mostly you don't want to use this function from inside libevent; 961 * bufferevent_get_write_max_() is more likely what you want. */ 962ev_ssize_t 963bufferevent_rate_limit_group_get_write_limit( 964 struct bufferevent_rate_limit_group *grp) 965{ 966 ev_ssize_t r; 967 LOCK_GROUP(grp); 968 r = grp->rate_limit.write_limit; 969 UNLOCK_GROUP(grp); 970 return r; 971} 972 973int 974bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) 975{ 976 int r = 0; 977 ev_ssize_t old_limit, new_limit; 978 struct bufferevent_private *bevp; 979 BEV_LOCK(bev); 980 bevp = BEV_UPCAST(bev); 981 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 982 old_limit = bevp->rate_limiting->limit.read_limit; 983 984 new_limit = (bevp->rate_limiting->limit.read_limit -= decr); 985 if (old_limit > 0 && new_limit <= 0) { 986 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); 987 if (event_add(&bevp->rate_limiting->refill_bucket_event, 988 &bevp->rate_limiting->cfg->tick_timeout) < 0) 989 r = -1; 990 } else if (old_limit <= 0 && new_limit > 0) { 991 if (!(bevp->write_suspended & BEV_SUSPEND_BW)) 992 event_del(&bevp->rate_limiting->refill_bucket_event); 993 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); 994 } 995 996 BEV_UNLOCK(bev); 997 return r; 998} 999 1000int 1001bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) 1002{ 1003 /* XXXX this is mostly copy-and-paste from 1004 * bufferevent_decrement_read_limit */ 1005 int r = 0; 1006 ev_ssize_t old_limit, new_limit; 1007 struct bufferevent_private *bevp; 1008 BEV_LOCK(bev); 1009 bevp = BEV_UPCAST(bev); 1010 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 1011 old_limit = bevp->rate_limiting->limit.write_limit; 1012 1013 new_limit = (bevp->rate_limiting->limit.write_limit -= decr); 1014 if (old_limit > 0 && new_limit <= 0) { 1015 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); 1016 if (event_add(&bevp->rate_limiting->refill_bucket_event, 1017 &bevp->rate_limiting->cfg->tick_timeout) < 0) 1018 r = -1; 1019 } else if (old_limit <= 0 && new_limit > 0) { 1020 if (!(bevp->read_suspended & BEV_SUSPEND_BW)) 1021 event_del(&bevp->rate_limiting->refill_bucket_event); 1022 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); 1023 } 1024 1025 BEV_UNLOCK(bev); 1026 return r; 1027} 1028 1029int 1030bufferevent_rate_limit_group_decrement_read( 1031 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1032{ 1033 int r = 0; 1034 ev_ssize_t old_limit, new_limit; 1035 LOCK_GROUP(grp); 1036 old_limit = grp->rate_limit.read_limit; 1037 new_limit = (grp->rate_limit.read_limit -= decr); 1038 1039 if (old_limit > 0 && new_limit <= 0) { 1040 bev_group_suspend_reading_(grp); 1041 } else if (old_limit <= 0 && new_limit > 0) { 1042 bev_group_unsuspend_reading_(grp); 1043 } 1044 1045 UNLOCK_GROUP(grp); 1046 return r; 1047} 1048 1049int 1050bufferevent_rate_limit_group_decrement_write( 1051 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1052{ 1053 int r = 0; 1054 ev_ssize_t old_limit, new_limit; 1055 LOCK_GROUP(grp); 1056 old_limit = grp->rate_limit.write_limit; 1057 new_limit = (grp->rate_limit.write_limit -= decr); 1058 1059 if (old_limit > 0 && new_limit <= 0) { 1060 bev_group_suspend_writing_(grp); 1061 } else if (old_limit <= 0 && new_limit > 0) { 1062 bev_group_unsuspend_writing_(grp); 1063 } 1064 1065 UNLOCK_GROUP(grp); 1066 return r; 1067} 1068 1069void 1070bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, 1071 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) 1072{ 1073 EVUTIL_ASSERT(grp != NULL); 1074 if (total_read_out) 1075 *total_read_out = grp->total_read; 1076 if (total_written_out) 1077 *total_written_out = grp->total_written; 1078} 1079 1080void 1081bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) 1082{ 1083 grp->total_read = grp->total_written = 0; 1084} 1085 1086int 1087bufferevent_ratelim_init_(struct bufferevent_private *bev) 1088{ 1089 bev->rate_limiting = NULL; 1090 bev->max_single_read = MAX_SINGLE_READ_DEFAULT; 1091 bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; 1092 1093 return 0; 1094} 1095