1/* 2 * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 3. The name of the author may not be used to endorse or promote products 13 * derived from this software without specific prior written permission. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 16 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 17 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 */ 26 27#include <stdio.h> 28#include <stdlib.h> 29#include <string.h> 30#include <assert.h> 31#include <math.h> 32 33#ifdef WIN32 34#include <winsock2.h> 35#include <ws2tcpip.h> 36#else 37#include <sys/socket.h> 38#include <netinet/in.h> 39# ifdef _XOPEN_SOURCE_EXTENDED 40# include <arpa/inet.h> 41# endif 42#endif 43#include <signal.h> 44 45#include "event2/bufferevent.h" 46#include "event2/buffer.h" 47#include "event2/event.h" 48#include "event2/util.h" 49#include "event2/listener.h" 50#include "event2/thread.h" 51 52#include "../util-internal.h" 53 54static int cfg_verbose = 0; 55static int cfg_help = 0; 56 57static int cfg_n_connections = 30; 58static int cfg_duration = 5; 59static int cfg_connlimit = 0; 60static int cfg_grouplimit = 0; 61static int cfg_tick_msec = 1000; 62static int cfg_min_share = -1; 63 64static int cfg_connlimit_tolerance = -1; 65static int cfg_grouplimit_tolerance = -1; 66static int cfg_stddev_tolerance = -1; 67 68#ifdef _WIN32 69static int cfg_enable_iocp = 0; 70#endif 71 72static struct timeval cfg_tick = { 0, 500*1000 }; 73 74static struct ev_token_bucket_cfg *conn_bucket_cfg = NULL; 75static struct ev_token_bucket_cfg *group_bucket_cfg = NULL; 76struct bufferevent_rate_limit_group *ratelim_group = NULL; 77static double seconds_per_tick = 0.0; 78 79struct client_state { 80 size_t queued; 81 ev_uint64_t received; 82}; 83 84static int n_echo_conns_open = 0; 85 86static void 87loud_writecb(struct bufferevent *bev, void *ctx) 88{ 89 struct client_state *cs = ctx; 90 struct evbuffer *output = bufferevent_get_output(bev); 91 char buf[1024]; 92#ifdef WIN32 93 int r = rand() % 256; 94#else 95 int r = random() % 256; 96#endif 97 memset(buf, r, sizeof(buf)); 98 while (evbuffer_get_length(output) < 8192) { 99 evbuffer_add(output, buf, sizeof(buf)); 100 cs->queued += sizeof(buf); 101 } 102} 103 104static void 105discard_readcb(struct bufferevent *bev, void *ctx) 106{ 107 struct client_state *cs = ctx; 108 struct evbuffer *input = bufferevent_get_input(bev); 109 size_t len = evbuffer_get_length(input); 110 evbuffer_drain(input, len); 111 cs->received += len; 112} 113 114static void 115write_on_connectedcb(struct bufferevent *bev, short what, void *ctx) 116{ 117 if (what & BEV_EVENT_CONNECTED) { 118 loud_writecb(bev, ctx); 119 /* XXXX this shouldn't be needed. */ 120 bufferevent_enable(bev, EV_READ|EV_WRITE); 121 } 122} 123 124static void 125echo_readcb(struct bufferevent *bev, void *ctx) 126{ 127 struct evbuffer *input = bufferevent_get_input(bev); 128 struct evbuffer *output = bufferevent_get_output(bev); 129 130 evbuffer_add_buffer(output, input); 131 if (evbuffer_get_length(output) > 1024000) 132 bufferevent_disable(bev, EV_READ); 133} 134 135static void 136echo_writecb(struct bufferevent *bev, void *ctx) 137{ 138 struct evbuffer *output = bufferevent_get_output(bev); 139 if (evbuffer_get_length(output) < 512000) 140 bufferevent_enable(bev, EV_READ); 141} 142 143static void 144echo_eventcb(struct bufferevent *bev, short what, void *ctx) 145{ 146 if (what & (BEV_EVENT_EOF|BEV_EVENT_ERROR)) { 147 --n_echo_conns_open; 148 bufferevent_free(bev); 149 } 150} 151 152static void 153echo_listenercb(struct evconnlistener *listener, evutil_socket_t newsock, 154 struct sockaddr *sourceaddr, int socklen, void *ctx) 155{ 156 struct event_base *base = ctx; 157 int flags = BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE; 158 struct bufferevent *bev; 159 160 bev = bufferevent_socket_new(base, newsock, flags); 161 bufferevent_setcb(bev, echo_readcb, echo_writecb, echo_eventcb, NULL); 162 if (conn_bucket_cfg) 163 bufferevent_set_rate_limit(bev, conn_bucket_cfg); 164 if (ratelim_group) 165 bufferevent_add_to_rate_limit_group(bev, ratelim_group); 166 ++n_echo_conns_open; 167 bufferevent_enable(bev, EV_READ|EV_WRITE); 168} 169 170static int 171test_ratelimiting(void) 172{ 173 struct event_base *base; 174 struct sockaddr_in sin; 175 struct evconnlistener *listener; 176 177 struct sockaddr_storage ss; 178 ev_socklen_t slen; 179 180 struct bufferevent **bevs; 181 struct client_state *states; 182 struct bufferevent_rate_limit_group *group = NULL; 183 184 int i; 185 186 struct timeval tv; 187 188 ev_uint64_t total_received; 189 double total_sq_persec, total_persec; 190 double variance; 191 double expected_total_persec = -1.0, expected_avg_persec = -1.0; 192 int ok = 1; 193 struct event_config *base_cfg; 194 195 memset(&sin, 0, sizeof(sin)); 196 sin.sin_family = AF_INET; 197 sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ 198 sin.sin_port = 0; /* unspecified port */ 199 200 if (0) 201 event_enable_debug_mode(); 202 203 base_cfg = event_config_new(); 204 205#ifdef _WIN32 206 if (cfg_enable_iocp) { 207 evthread_use_windows_threads(); 208 event_config_set_flag(base_cfg, EVENT_BASE_FLAG_STARTUP_IOCP); 209 } 210#endif 211 212 base = event_base_new_with_config(base_cfg); 213 event_config_free(base_cfg); 214 215 listener = evconnlistener_new_bind(base, echo_listenercb, base, 216 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1, 217 (struct sockaddr *)&sin, sizeof(sin)); 218 219 slen = sizeof(ss); 220 if (getsockname(evconnlistener_get_fd(listener), (struct sockaddr *)&ss, 221 &slen) < 0) { 222 perror("getsockname"); 223 return 1; 224 } 225 226 if (cfg_connlimit > 0) { 227 conn_bucket_cfg = ev_token_bucket_cfg_new( 228 cfg_connlimit, cfg_connlimit * 4, 229 cfg_connlimit, cfg_connlimit * 4, 230 &cfg_tick); 231 assert(conn_bucket_cfg); 232 } 233 234 if (cfg_grouplimit > 0) { 235 group_bucket_cfg = ev_token_bucket_cfg_new( 236 cfg_grouplimit, cfg_grouplimit * 4, 237 cfg_grouplimit, cfg_grouplimit * 4, 238 &cfg_tick); 239 group = ratelim_group = bufferevent_rate_limit_group_new( 240 base, group_bucket_cfg); 241 expected_total_persec = cfg_grouplimit; 242 expected_avg_persec = cfg_grouplimit / cfg_n_connections; 243 if (cfg_connlimit > 0 && expected_avg_persec > cfg_connlimit) 244 expected_avg_persec = cfg_connlimit; 245 if (cfg_min_share >= 0) 246 bufferevent_rate_limit_group_set_min_share( 247 ratelim_group, cfg_min_share); 248 } 249 250 if (expected_avg_persec < 0 && cfg_connlimit > 0) 251 expected_avg_persec = cfg_connlimit; 252 253 if (expected_avg_persec > 0) 254 expected_avg_persec /= seconds_per_tick; 255 if (expected_total_persec > 0) 256 expected_total_persec /= seconds_per_tick; 257 258 bevs = calloc(cfg_n_connections, sizeof(struct bufferevent *)); 259 states = calloc(cfg_n_connections, sizeof(struct client_state)); 260 261 for (i = 0; i < cfg_n_connections; ++i) { 262 bevs[i] = bufferevent_socket_new(base, -1, 263 BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE); 264 assert(bevs[i]); 265 bufferevent_setcb(bevs[i], discard_readcb, loud_writecb, 266 write_on_connectedcb, &states[i]); 267 bufferevent_enable(bevs[i], EV_READ|EV_WRITE); 268 bufferevent_socket_connect(bevs[i], (struct sockaddr *)&ss, 269 slen); 270 } 271 272 tv.tv_sec = cfg_duration - 1; 273 tv.tv_usec = 995000; 274 275 event_base_loopexit(base, &tv); 276 277 event_base_dispatch(base); 278 279 ratelim_group = NULL; /* So no more responders get added */ 280 281 for (i = 0; i < cfg_n_connections; ++i) { 282 bufferevent_free(bevs[i]); 283 } 284 evconnlistener_free(listener); 285 286 /* Make sure no new echo_conns get added to the group. */ 287 ratelim_group = NULL; 288 289 /* This should get _everybody_ freed */ 290 while (n_echo_conns_open) { 291 printf("waiting for %d conns\n", n_echo_conns_open); 292 tv.tv_sec = 0; 293 tv.tv_usec = 300000; 294 event_base_loopexit(base, &tv); 295 event_base_dispatch(base); 296 } 297 298 if (group) 299 bufferevent_rate_limit_group_free(group); 300 301 total_received = 0; 302 total_persec = 0.0; 303 total_sq_persec = 0.0; 304 for (i=0; i < cfg_n_connections; ++i) { 305 double persec = states[i].received; 306 persec /= cfg_duration; 307 total_received += states[i].received; 308 total_persec += persec; 309 total_sq_persec += persec*persec; 310 printf("%d: %f per second\n", i+1, persec); 311 } 312 printf(" total: %f per second\n", 313 ((double)total_received)/cfg_duration); 314 if (expected_total_persec > 0) { 315 double diff = expected_total_persec - 316 ((double)total_received/cfg_duration); 317 printf(" [Off by %lf]\n", diff); 318 if (cfg_grouplimit_tolerance > 0 && 319 fabs(diff) > cfg_grouplimit_tolerance) { 320 fprintf(stderr, "Group bandwidth out of bounds\n"); 321 ok = 0; 322 } 323 } 324 325 printf(" average: %f per second\n", 326 (((double)total_received)/cfg_duration)/cfg_n_connections); 327 if (expected_avg_persec > 0) { 328 double diff = expected_avg_persec - (((double)total_received)/cfg_duration)/cfg_n_connections; 329 printf(" [Off by %lf]\n", diff); 330 if (cfg_connlimit_tolerance > 0 && 331 fabs(diff) > cfg_connlimit_tolerance) { 332 fprintf(stderr, "Connection bandwidth out of bounds\n"); 333 ok = 0; 334 } 335 } 336 337 variance = total_sq_persec/cfg_n_connections - total_persec*total_persec/(cfg_n_connections*cfg_n_connections); 338 339 printf(" stddev: %f per second\n", sqrt(variance)); 340 if (cfg_stddev_tolerance > 0 && 341 sqrt(variance) > cfg_stddev_tolerance) { 342 fprintf(stderr, "Connection variance out of bounds\n"); 343 ok = 0; 344 } 345 346 event_base_free(base); 347 free(bevs); 348 free(states); 349 350 return ok ? 0 : 1; 351} 352 353static struct option { 354 const char *name; int *ptr; int min; int isbool; 355} options[] = { 356 { "-v", &cfg_verbose, 0, 1 }, 357 { "-h", &cfg_help, 0, 1 }, 358 { "-n", &cfg_n_connections, 1, 0 }, 359 { "-d", &cfg_duration, 1, 0 }, 360 { "-c", &cfg_connlimit, 0, 0 }, 361 { "-g", &cfg_grouplimit, 0, 0 }, 362 { "-t", &cfg_tick_msec, 10, 0 }, 363 { "--min-share", &cfg_min_share, 0, 0 }, 364 { "--check-connlimit", &cfg_connlimit_tolerance, 0, 0 }, 365 { "--check-grouplimit", &cfg_grouplimit_tolerance, 0, 0 }, 366 { "--check-stddev", &cfg_stddev_tolerance, 0, 0 }, 367#ifdef _WIN32 368 { "--iocp", &cfg_enable_iocp, 0, 1 }, 369#endif 370 { NULL, NULL, -1, 0 }, 371}; 372 373static int 374handle_option(int argc, char **argv, int *i, const struct option *opt) 375{ 376 long val; 377 char *endptr = NULL; 378 if (opt->isbool) { 379 *opt->ptr = 1; 380 return 0; 381 } 382 if (*i + 1 == argc) { 383 fprintf(stderr, "Too few arguments to '%s'\n",argv[*i]); 384 return -1; 385 } 386 val = strtol(argv[*i+1], &endptr, 10); 387 if (*argv[*i+1] == '\0' || !endptr || *endptr != '\0') { 388 fprintf(stderr, "Couldn't parse numeric value '%s'\n", 389 argv[*i+1]); 390 return -1; 391 } 392 if (val < opt->min || val > 0x7fffffff) { 393 fprintf(stderr, "Value '%s' is out-of-range'\n", 394 argv[*i+1]); 395 return -1; 396 } 397 *opt->ptr = (int)val; 398 ++*i; 399 return 0; 400} 401 402static void 403usage(void) 404{ 405 fprintf(stderr, 406"test-ratelim [-v] [-n INT] [-d INT] [-c INT] [-g INT] [-t INT]\n\n" 407"Pushes bytes through a number of possibly rate-limited connections, and\n" 408"displays average throughput.\n\n" 409" -n INT: Number of connections to open (default: 30)\n" 410" -d INT: Duration of the test in seconds (default: 5 sec)\n"); 411 fprintf(stderr, 412" -c INT: Connection-rate limit applied to each connection in bytes per second\n" 413" (default: None.)\n" 414" -g INT: Group-rate limit applied to sum of all usage in bytes per second\n" 415" (default: None.)\n" 416" -t INT: Granularity of timing, in milliseconds (default: 1000 msec)\n"); 417} 418 419int 420main(int argc, char **argv) 421{ 422 int i,j; 423 double ratio; 424 425#ifdef WIN32 426 WORD wVersionRequested = MAKEWORD(2,2); 427 WSADATA wsaData; 428 int err; 429 430 err = WSAStartup(wVersionRequested, &wsaData); 431#endif 432 433#ifndef WIN32 434 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) 435 return 1; 436#endif 437 for (i = 1; i < argc; ++i) { 438 for (j = 0; options[j].name; ++j) { 439 if (!strcmp(argv[i],options[j].name)) { 440 if (handle_option(argc,argv,&i,&options[j])<0) 441 return 1; 442 goto again; 443 } 444 } 445 fprintf(stderr, "Unknown option '%s'\n", argv[i]); 446 usage(); 447 return 1; 448 again: 449 ; 450 } 451 if (cfg_help) { 452 usage(); 453 return 0; 454 } 455 456 cfg_tick.tv_sec = cfg_tick_msec / 1000; 457 cfg_tick.tv_usec = (cfg_tick_msec % 1000)*1000; 458 459 seconds_per_tick = ratio = cfg_tick_msec / 1000.0; 460 461 cfg_connlimit *= ratio; 462 cfg_grouplimit *= ratio; 463 464 { 465 struct timeval tv; 466 evutil_gettimeofday(&tv, NULL); 467#ifdef WIN32 468 srand(tv.tv_usec); 469#else 470 srandom(tv.tv_usec); 471#endif 472 } 473 474#ifndef _EVENT_DISABLE_THREAD_SUPPORT 475 evthread_enable_lock_debuging(); 476#endif 477 478 return test_ratelimiting(); 479} 480