benchmark-pump.c revision 1.1.1.1
1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "task.h"
23#include "uv.h"
24
25#include <math.h>
26#include <stdio.h>
27
28
29static int TARGET_CONNECTIONS;
30#define WRITE_BUFFER_SIZE           8192
31#define MAX_SIMULTANEOUS_CONNECTS   100
32
33#define PRINT_STATS                 0
34#define STATS_INTERVAL              1000 /* msec */
35#define STATS_COUNT                 5
36
37
38static void do_write(uv_stream_t*);
39static void maybe_connect_some(void);
40
41static uv_req_t* req_alloc(void);
42static void req_free(uv_req_t* uv_req);
43
44static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);
45static void buf_free(const uv_buf_t* buf);
46
47static uv_loop_t* loop;
48
49static uv_tcp_t tcpServer;
50static uv_pipe_t pipeServer;
51static uv_stream_t* server;
52static struct sockaddr_in listen_addr;
53static struct sockaddr_in connect_addr;
54
55static int64_t start_time;
56
57static int max_connect_socket = 0;
58static int max_read_sockets = 0;
59static int read_sockets = 0;
60static int write_sockets = 0;
61
62static int64_t nrecv = 0;
63static int64_t nrecv_total = 0;
64static int64_t nsent = 0;
65static int64_t nsent_total = 0;
66
67static int stats_left = 0;
68
69static char write_buffer[WRITE_BUFFER_SIZE];
70
71/* Make this as large as you need. */
72#define MAX_WRITE_HANDLES 1000
73
74static stream_type type;
75
76static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];
77static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];
78
79static uv_timer_t timer_handle;
80
81
82static double gbit(int64_t bytes, int64_t passed_ms) {
83  double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8;
84  return gbits / ((double)passed_ms / 1000);
85}
86
87
88static void show_stats(uv_timer_t* handle) {
89  int64_t diff;
90  int i;
91
92#if PRINT_STATS
93  fprintf(stderr, "connections: %d, write: %.1f gbit/s\n",
94          write_sockets,
95          gbit(nsent, STATS_INTERVAL));
96  fflush(stderr);
97#endif
98
99  /* Exit if the show is over */
100  if (!--stats_left) {
101
102    uv_update_time(loop);
103    diff = uv_now(loop) - start_time;
104
105    fprintf(stderr, "%s_pump%d_client: %.1f gbit/s\n",
106            type == TCP ? "tcp" : "pipe",
107            write_sockets,
108            gbit(nsent_total, diff));
109    fflush(stderr);
110
111    for (i = 0; i < write_sockets; i++) {
112      if (type == TCP)
113        uv_close((uv_handle_t*) &tcp_write_handles[i], NULL);
114      else
115        uv_close((uv_handle_t*) &pipe_write_handles[i], NULL);
116    }
117
118    exit(0);
119  }
120
121  /* Reset read and write counters */
122  nrecv = 0;
123  nsent = 0;
124}
125
126
127static void read_show_stats(void) {
128  int64_t diff;
129
130  uv_update_time(loop);
131  diff = uv_now(loop) - start_time;
132
133  fprintf(stderr, "%s_pump%d_server: %.1f gbit/s\n",
134          type == TCP ? "tcp" : "pipe",
135          max_read_sockets,
136          gbit(nrecv_total, diff));
137  fflush(stderr);
138}
139
140
141
142static void read_sockets_close_cb(uv_handle_t* handle) {
143  free(handle);
144  read_sockets--;
145
146  /* If it's past the first second and everyone has closed their connection
147   * Then print stats.
148   */
149  if (uv_now(loop) - start_time > 1000 && read_sockets == 0) {
150    read_show_stats();
151    uv_close((uv_handle_t*)server, NULL);
152  }
153}
154
155
156static void start_stats_collection(void) {
157  int r;
158
159  /* Show-stats timer */
160  stats_left = STATS_COUNT;
161  r = uv_timer_init(loop, &timer_handle);
162  ASSERT(r == 0);
163  r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL);
164  ASSERT(r == 0);
165
166  uv_update_time(loop);
167  start_time = uv_now(loop);
168}
169
170
171static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) {
172  if (nrecv_total == 0) {
173    ASSERT(start_time == 0);
174    uv_update_time(loop);
175    start_time = uv_now(loop);
176  }
177
178  if (bytes < 0) {
179    uv_close((uv_handle_t*)stream, read_sockets_close_cb);
180    return;
181  }
182
183  buf_free(buf);
184
185  nrecv += bytes;
186  nrecv_total += bytes;
187}
188
189
190static void write_cb(uv_write_t* req, int status) {
191  ASSERT(status == 0);
192
193  req_free((uv_req_t*) req);
194
195  nsent += sizeof write_buffer;
196  nsent_total += sizeof write_buffer;
197
198  do_write((uv_stream_t*) req->handle);
199}
200
201
202static void do_write(uv_stream_t* stream) {
203  uv_write_t* req;
204  uv_buf_t buf;
205  int r;
206
207  buf.base = (char*) &write_buffer;
208  buf.len = sizeof write_buffer;
209
210  req = (uv_write_t*) req_alloc();
211  r = uv_write(req, stream, &buf, 1, write_cb);
212  ASSERT(r == 0);
213}
214
215
216static void connect_cb(uv_connect_t* req, int status) {
217  int i;
218
219  if (status) {
220    fprintf(stderr, "%s", uv_strerror(status));
221    fflush(stderr);
222  }
223  ASSERT(status == 0);
224
225  write_sockets++;
226  req_free((uv_req_t*) req);
227
228  maybe_connect_some();
229
230  if (write_sockets == TARGET_CONNECTIONS) {
231    start_stats_collection();
232
233    /* Yay! start writing */
234    for (i = 0; i < write_sockets; i++) {
235      if (type == TCP)
236        do_write((uv_stream_t*) &tcp_write_handles[i]);
237      else
238        do_write((uv_stream_t*) &pipe_write_handles[i]);
239    }
240  }
241}
242
243
244static void maybe_connect_some(void) {
245  uv_connect_t* req;
246  uv_tcp_t* tcp;
247  uv_pipe_t* pipe;
248  int r;
249
250  while (max_connect_socket < TARGET_CONNECTIONS &&
251         max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
252    if (type == TCP) {
253      tcp = &tcp_write_handles[max_connect_socket++];
254
255      r = uv_tcp_init(loop, tcp);
256      ASSERT(r == 0);
257
258      req = (uv_connect_t*) req_alloc();
259      r = uv_tcp_connect(req,
260                         tcp,
261                         (const struct sockaddr*) &connect_addr,
262                         connect_cb);
263      ASSERT(r == 0);
264    } else {
265      pipe = &pipe_write_handles[max_connect_socket++];
266
267      r = uv_pipe_init(loop, pipe, 0);
268      ASSERT(r == 0);
269
270      req = (uv_connect_t*) req_alloc();
271      uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
272    }
273  }
274}
275
276
277static void connection_cb(uv_stream_t* s, int status) {
278  uv_stream_t* stream;
279  int r;
280
281  ASSERT(server == s);
282  ASSERT(status == 0);
283
284  if (type == TCP) {
285    stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
286    r = uv_tcp_init(loop, (uv_tcp_t*)stream);
287    ASSERT(r == 0);
288  } else {
289    stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
290    r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);
291    ASSERT(r == 0);
292  }
293
294  r = uv_accept(s, stream);
295  ASSERT(r == 0);
296
297  r = uv_read_start(stream, buf_alloc, read_cb);
298  ASSERT(r == 0);
299
300  read_sockets++;
301  max_read_sockets++;
302}
303
304
305/*
306 * Request allocator
307 */
308
309typedef struct req_list_s {
310  union uv_any_req uv_req;
311  struct req_list_s* next;
312} req_list_t;
313
314
315static req_list_t* req_freelist = NULL;
316
317
318static uv_req_t* req_alloc(void) {
319  req_list_t* req;
320
321  req = req_freelist;
322  if (req != NULL) {
323    req_freelist = req->next;
324    return (uv_req_t*) req;
325  }
326
327  req = (req_list_t*) malloc(sizeof *req);
328  return (uv_req_t*) req;
329}
330
331
332static void req_free(uv_req_t* uv_req) {
333  req_list_t* req = (req_list_t*) uv_req;
334
335  req->next = req_freelist;
336  req_freelist = req;
337}
338
339
340/*
341 * Buffer allocator
342 */
343
344typedef struct buf_list_s {
345  uv_buf_t uv_buf_t;
346  struct buf_list_s* next;
347} buf_list_t;
348
349
350static buf_list_t* buf_freelist = NULL;
351
352
353static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
354  buf_list_t* ab;
355
356  ab = buf_freelist;
357  if (ab != NULL)
358    buf_freelist = ab->next;
359  else {
360    ab = malloc(size + sizeof(*ab));
361    ab->uv_buf_t.len = size;
362    ab->uv_buf_t.base = (char*) (ab + 1);
363  }
364
365  *buf = ab->uv_buf_t;
366}
367
368
369static void buf_free(const uv_buf_t* buf) {
370  buf_list_t* ab = (buf_list_t*) buf->base - 1;
371  ab->next = buf_freelist;
372  buf_freelist = ab;
373}
374
375
376HELPER_IMPL(tcp_pump_server) {
377  int r;
378
379  type = TCP;
380  loop = uv_default_loop();
381
382  ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr));
383
384  /* Server */
385  server = (uv_stream_t*)&tcpServer;
386  r = uv_tcp_init(loop, &tcpServer);
387  ASSERT(r == 0);
388  r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0);
389  ASSERT(r == 0);
390  r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb);
391  ASSERT(r == 0);
392
393  uv_run(loop, UV_RUN_DEFAULT);
394
395  return 0;
396}
397
398
399HELPER_IMPL(pipe_pump_server) {
400  int r;
401  type = PIPE;
402
403  loop = uv_default_loop();
404
405  /* Server */
406  server = (uv_stream_t*)&pipeServer;
407  r = uv_pipe_init(loop, &pipeServer, 0);
408  ASSERT(r == 0);
409  r = uv_pipe_bind(&pipeServer, TEST_PIPENAME);
410  ASSERT(r == 0);
411  r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb);
412  ASSERT(r == 0);
413
414  uv_run(loop, UV_RUN_DEFAULT);
415
416  MAKE_VALGRIND_HAPPY();
417  return 0;
418}
419
420
421static void tcp_pump(int n) {
422  ASSERT(n <= MAX_WRITE_HANDLES);
423  TARGET_CONNECTIONS = n;
424  type = TCP;
425
426  loop = uv_default_loop();
427
428  ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr));
429
430  /* Start making connections */
431  maybe_connect_some();
432
433  uv_run(loop, UV_RUN_DEFAULT);
434
435  MAKE_VALGRIND_HAPPY();
436}
437
438
439static void pipe_pump(int n) {
440  ASSERT(n <= MAX_WRITE_HANDLES);
441  TARGET_CONNECTIONS = n;
442  type = PIPE;
443
444  loop = uv_default_loop();
445
446  /* Start making connections */
447  maybe_connect_some();
448
449  uv_run(loop, UV_RUN_DEFAULT);
450
451  MAKE_VALGRIND_HAPPY();
452}
453
454
455BENCHMARK_IMPL(tcp_pump100_client) {
456  tcp_pump(100);
457  return 0;
458}
459
460
461BENCHMARK_IMPL(tcp_pump1_client) {
462  tcp_pump(1);
463  return 0;
464}
465
466
467BENCHMARK_IMPL(pipe_pump100_client) {
468  pipe_pump(100);
469  return 0;
470}
471
472
473BENCHMARK_IMPL(pipe_pump1_client) {
474  pipe_pump(1);
475  return 0;
476}
477