benchmark-pump.c revision 1.1.1.1
1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22#include "task.h" 23#include "uv.h" 24 25#include <math.h> 26#include <stdio.h> 27 28 29static int TARGET_CONNECTIONS; 30#define WRITE_BUFFER_SIZE 8192 31#define MAX_SIMULTANEOUS_CONNECTS 100 32 33#define PRINT_STATS 0 34#define STATS_INTERVAL 1000 /* msec */ 35#define STATS_COUNT 5 36 37 38static void do_write(uv_stream_t*); 39static void maybe_connect_some(void); 40 41static uv_req_t* req_alloc(void); 42static void req_free(uv_req_t* uv_req); 43 44static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf); 45static void buf_free(const uv_buf_t* buf); 46 47static uv_loop_t* loop; 48 49static uv_tcp_t tcpServer; 50static uv_pipe_t pipeServer; 51static uv_stream_t* server; 52static struct sockaddr_in listen_addr; 53static struct sockaddr_in connect_addr; 54 55static int64_t start_time; 56 57static int max_connect_socket = 0; 58static int max_read_sockets = 0; 59static int read_sockets = 0; 60static int write_sockets = 0; 61 62static int64_t nrecv = 0; 63static int64_t nrecv_total = 0; 64static int64_t nsent = 0; 65static int64_t nsent_total = 0; 66 67static int stats_left = 0; 68 69static char write_buffer[WRITE_BUFFER_SIZE]; 70 71/* Make this as large as you need. */ 72#define MAX_WRITE_HANDLES 1000 73 74static stream_type type; 75 76static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES]; 77static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES]; 78 79static uv_timer_t timer_handle; 80 81 82static double gbit(int64_t bytes, int64_t passed_ms) { 83 double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8; 84 return gbits / ((double)passed_ms / 1000); 85} 86 87 88static void show_stats(uv_timer_t* handle) { 89 int64_t diff; 90 int i; 91 92#if PRINT_STATS 93 fprintf(stderr, "connections: %d, write: %.1f gbit/s\n", 94 write_sockets, 95 gbit(nsent, STATS_INTERVAL)); 96 fflush(stderr); 97#endif 98 99 /* Exit if the show is over */ 100 if (!--stats_left) { 101 102 uv_update_time(loop); 103 diff = uv_now(loop) - start_time; 104 105 fprintf(stderr, "%s_pump%d_client: %.1f gbit/s\n", 106 type == TCP ? "tcp" : "pipe", 107 write_sockets, 108 gbit(nsent_total, diff)); 109 fflush(stderr); 110 111 for (i = 0; i < write_sockets; i++) { 112 if (type == TCP) 113 uv_close((uv_handle_t*) &tcp_write_handles[i], NULL); 114 else 115 uv_close((uv_handle_t*) &pipe_write_handles[i], NULL); 116 } 117 118 exit(0); 119 } 120 121 /* Reset read and write counters */ 122 nrecv = 0; 123 nsent = 0; 124} 125 126 127static void read_show_stats(void) { 128 int64_t diff; 129 130 uv_update_time(loop); 131 diff = uv_now(loop) - start_time; 132 133 fprintf(stderr, "%s_pump%d_server: %.1f gbit/s\n", 134 type == TCP ? "tcp" : "pipe", 135 max_read_sockets, 136 gbit(nrecv_total, diff)); 137 fflush(stderr); 138} 139 140 141 142static void read_sockets_close_cb(uv_handle_t* handle) { 143 free(handle); 144 read_sockets--; 145 146 /* If it's past the first second and everyone has closed their connection 147 * Then print stats. 148 */ 149 if (uv_now(loop) - start_time > 1000 && read_sockets == 0) { 150 read_show_stats(); 151 uv_close((uv_handle_t*)server, NULL); 152 } 153} 154 155 156static void start_stats_collection(void) { 157 int r; 158 159 /* Show-stats timer */ 160 stats_left = STATS_COUNT; 161 r = uv_timer_init(loop, &timer_handle); 162 ASSERT(r == 0); 163 r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL); 164 ASSERT(r == 0); 165 166 uv_update_time(loop); 167 start_time = uv_now(loop); 168} 169 170 171static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) { 172 if (nrecv_total == 0) { 173 ASSERT(start_time == 0); 174 uv_update_time(loop); 175 start_time = uv_now(loop); 176 } 177 178 if (bytes < 0) { 179 uv_close((uv_handle_t*)stream, read_sockets_close_cb); 180 return; 181 } 182 183 buf_free(buf); 184 185 nrecv += bytes; 186 nrecv_total += bytes; 187} 188 189 190static void write_cb(uv_write_t* req, int status) { 191 ASSERT(status == 0); 192 193 req_free((uv_req_t*) req); 194 195 nsent += sizeof write_buffer; 196 nsent_total += sizeof write_buffer; 197 198 do_write((uv_stream_t*) req->handle); 199} 200 201 202static void do_write(uv_stream_t* stream) { 203 uv_write_t* req; 204 uv_buf_t buf; 205 int r; 206 207 buf.base = (char*) &write_buffer; 208 buf.len = sizeof write_buffer; 209 210 req = (uv_write_t*) req_alloc(); 211 r = uv_write(req, stream, &buf, 1, write_cb); 212 ASSERT(r == 0); 213} 214 215 216static void connect_cb(uv_connect_t* req, int status) { 217 int i; 218 219 if (status) { 220 fprintf(stderr, "%s", uv_strerror(status)); 221 fflush(stderr); 222 } 223 ASSERT(status == 0); 224 225 write_sockets++; 226 req_free((uv_req_t*) req); 227 228 maybe_connect_some(); 229 230 if (write_sockets == TARGET_CONNECTIONS) { 231 start_stats_collection(); 232 233 /* Yay! start writing */ 234 for (i = 0; i < write_sockets; i++) { 235 if (type == TCP) 236 do_write((uv_stream_t*) &tcp_write_handles[i]); 237 else 238 do_write((uv_stream_t*) &pipe_write_handles[i]); 239 } 240 } 241} 242 243 244static void maybe_connect_some(void) { 245 uv_connect_t* req; 246 uv_tcp_t* tcp; 247 uv_pipe_t* pipe; 248 int r; 249 250 while (max_connect_socket < TARGET_CONNECTIONS && 251 max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) { 252 if (type == TCP) { 253 tcp = &tcp_write_handles[max_connect_socket++]; 254 255 r = uv_tcp_init(loop, tcp); 256 ASSERT(r == 0); 257 258 req = (uv_connect_t*) req_alloc(); 259 r = uv_tcp_connect(req, 260 tcp, 261 (const struct sockaddr*) &connect_addr, 262 connect_cb); 263 ASSERT(r == 0); 264 } else { 265 pipe = &pipe_write_handles[max_connect_socket++]; 266 267 r = uv_pipe_init(loop, pipe, 0); 268 ASSERT(r == 0); 269 270 req = (uv_connect_t*) req_alloc(); 271 uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb); 272 } 273 } 274} 275 276 277static void connection_cb(uv_stream_t* s, int status) { 278 uv_stream_t* stream; 279 int r; 280 281 ASSERT(server == s); 282 ASSERT(status == 0); 283 284 if (type == TCP) { 285 stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); 286 r = uv_tcp_init(loop, (uv_tcp_t*)stream); 287 ASSERT(r == 0); 288 } else { 289 stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t)); 290 r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0); 291 ASSERT(r == 0); 292 } 293 294 r = uv_accept(s, stream); 295 ASSERT(r == 0); 296 297 r = uv_read_start(stream, buf_alloc, read_cb); 298 ASSERT(r == 0); 299 300 read_sockets++; 301 max_read_sockets++; 302} 303 304 305/* 306 * Request allocator 307 */ 308 309typedef struct req_list_s { 310 union uv_any_req uv_req; 311 struct req_list_s* next; 312} req_list_t; 313 314 315static req_list_t* req_freelist = NULL; 316 317 318static uv_req_t* req_alloc(void) { 319 req_list_t* req; 320 321 req = req_freelist; 322 if (req != NULL) { 323 req_freelist = req->next; 324 return (uv_req_t*) req; 325 } 326 327 req = (req_list_t*) malloc(sizeof *req); 328 return (uv_req_t*) req; 329} 330 331 332static void req_free(uv_req_t* uv_req) { 333 req_list_t* req = (req_list_t*) uv_req; 334 335 req->next = req_freelist; 336 req_freelist = req; 337} 338 339 340/* 341 * Buffer allocator 342 */ 343 344typedef struct buf_list_s { 345 uv_buf_t uv_buf_t; 346 struct buf_list_s* next; 347} buf_list_t; 348 349 350static buf_list_t* buf_freelist = NULL; 351 352 353static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) { 354 buf_list_t* ab; 355 356 ab = buf_freelist; 357 if (ab != NULL) 358 buf_freelist = ab->next; 359 else { 360 ab = malloc(size + sizeof(*ab)); 361 ab->uv_buf_t.len = size; 362 ab->uv_buf_t.base = (char*) (ab + 1); 363 } 364 365 *buf = ab->uv_buf_t; 366} 367 368 369static void buf_free(const uv_buf_t* buf) { 370 buf_list_t* ab = (buf_list_t*) buf->base - 1; 371 ab->next = buf_freelist; 372 buf_freelist = ab; 373} 374 375 376HELPER_IMPL(tcp_pump_server) { 377 int r; 378 379 type = TCP; 380 loop = uv_default_loop(); 381 382 ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr)); 383 384 /* Server */ 385 server = (uv_stream_t*)&tcpServer; 386 r = uv_tcp_init(loop, &tcpServer); 387 ASSERT(r == 0); 388 r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0); 389 ASSERT(r == 0); 390 r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb); 391 ASSERT(r == 0); 392 393 uv_run(loop, UV_RUN_DEFAULT); 394 395 return 0; 396} 397 398 399HELPER_IMPL(pipe_pump_server) { 400 int r; 401 type = PIPE; 402 403 loop = uv_default_loop(); 404 405 /* Server */ 406 server = (uv_stream_t*)&pipeServer; 407 r = uv_pipe_init(loop, &pipeServer, 0); 408 ASSERT(r == 0); 409 r = uv_pipe_bind(&pipeServer, TEST_PIPENAME); 410 ASSERT(r == 0); 411 r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb); 412 ASSERT(r == 0); 413 414 uv_run(loop, UV_RUN_DEFAULT); 415 416 MAKE_VALGRIND_HAPPY(); 417 return 0; 418} 419 420 421static void tcp_pump(int n) { 422 ASSERT(n <= MAX_WRITE_HANDLES); 423 TARGET_CONNECTIONS = n; 424 type = TCP; 425 426 loop = uv_default_loop(); 427 428 ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr)); 429 430 /* Start making connections */ 431 maybe_connect_some(); 432 433 uv_run(loop, UV_RUN_DEFAULT); 434 435 MAKE_VALGRIND_HAPPY(); 436} 437 438 439static void pipe_pump(int n) { 440 ASSERT(n <= MAX_WRITE_HANDLES); 441 TARGET_CONNECTIONS = n; 442 type = PIPE; 443 444 loop = uv_default_loop(); 445 446 /* Start making connections */ 447 maybe_connect_some(); 448 449 uv_run(loop, UV_RUN_DEFAULT); 450 451 MAKE_VALGRIND_HAPPY(); 452} 453 454 455BENCHMARK_IMPL(tcp_pump100_client) { 456 tcp_pump(100); 457 return 0; 458} 459 460 461BENCHMARK_IMPL(tcp_pump1_client) { 462 tcp_pump(1); 463 return 0; 464} 465 466 467BENCHMARK_IMPL(pipe_pump100_client) { 468 pipe_pump(100); 469 return 0; 470} 471 472 473BENCHMARK_IMPL(pipe_pump1_client) { 474 pipe_pump(1); 475 return 0; 476} 477