1/*- 2 * Copyright (c) 2008-2009 Robert N. M. Watson 3 * Copyright (c) 2010 Juniper Networks, Inc. 4 * All rights reserved. 5 * 6 * This software was developed by Robert N. M. Watson under contract 7 * to Juniper Networks, Inc. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions 11 * are met: 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28 * SUCH DAMAGE. 29 * 30 * $FreeBSD$ 31 */ 32 33#include <sys/types.h> 34#include <sys/event.h> 35#include <sys/resource.h> 36#include <sys/sched.h> 37#include <sys/socket.h> 38#include <sys/sysctl.h> 39#include <sys/time.h> 40#include <sys/uio.h> 41#include <sys/wait.h> 42 43#include <netinet/in.h> 44#include <netinet/tcp.h> 45 46#include <err.h> 47#include <errno.h> 48#include <fcntl.h> 49#include <inttypes.h> 50#include <signal.h> 51#include <stdio.h> 52#include <stdlib.h> 53#include <string.h> 54#include <unistd.h> 55 56#include "tcpp.h" 57 58#define min(x, y) (x < y ? x : y) 59 60#define timespecsub(vvp, uvp) \ 61 do { \ 62 (vvp)->tv_sec -= (uvp)->tv_sec; \ 63 (vvp)->tv_nsec -= (uvp)->tv_nsec; \ 64 if ((vvp)->tv_nsec < 0) { \ 65 (vvp)->tv_sec--; \ 66 (vvp)->tv_nsec += 1000000000; \ 67 } \ 68 } while (0) 69 70 71/* 72 * Gist of each client worker: build up to mflag connections at a time, and 73 * pump data in to them somewhat fairly until tflag connections have been 74 * completed. 75 */ 76#define CONNECTION_MAGIC 0x87a3f56e 77struct connection { 78 uint32_t conn_magic; /* Just magic. */ 79 int conn_fd; 80 struct tcpp_header conn_header; /* Header buffer. */ 81 u_int conn_header_sent; /* Header bytes sent. */ 82 u_int64_t conn_data_sent; /* Data bytes sent.*/ 83}; 84 85static u_char buffer[256 * 1024]; /* Buffer to send. */ 86static pid_t *pid_list; 87static int kq; 88static int started; /* Number started so far. */ 89static int finished; /* Number finished so far. */ 90static int counter; /* IP number offset. */ 91static uint64_t payload_len; 92 93static struct connection * 94tcpp_client_newconn(void) 95{ 96 struct sockaddr_in sin; 97 struct connection *conn; 98 struct kevent kev; 99 int fd, i; 100 101 /* 102 * Spread load over available IPs, rotating through them as we go. No 103 * attempt to localize IPs to particular workers. 104 */ 105 sin = localipbase; 106 sin.sin_addr.s_addr = htonl(ntohl(localipbase.sin_addr.s_addr) + 107 (counter++ % Mflag)); 108 109 fd = socket(PF_INET, SOCK_STREAM, 0); 110 if (fd < 0) 111 err(-1, "socket"); 112 113 if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) 114 err(-1, "fcntl"); 115 116 i = 1; 117 if (setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i)) < 0) 118 err(-1, "setsockopt"); 119 i = 1; 120 if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i)) < 0) 121 err(-1, "setsockopt"); 122#if 0 123 i = 1; 124 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)) < 0) 125 err(-1, "setsockopt"); 126#endif 127 128 if (lflag) { 129 if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) < 0) 130 err(-1, "bind"); 131 } 132 133 if (connect(fd, (struct sockaddr *)&remoteip, sizeof(remoteip)) < 0 && 134 errno != EINPROGRESS) 135 err(-1, "connect"); 136 137 conn = malloc(sizeof(*conn)); 138 if (conn == NULL) 139 return (NULL); 140 bzero(conn, sizeof(*conn)); 141 conn->conn_magic = CONNECTION_MAGIC; 142 conn->conn_fd = fd; 143 conn->conn_header.th_magic = TCPP_MAGIC; 144 conn->conn_header.th_len = payload_len; 145 tcpp_header_encode(&conn->conn_header); 146 147 EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, conn); 148 if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0) 149 err(-1, "newconn kevent"); 150 151 started++; 152 return (conn); 153} 154 155static void 156tcpp_client_closeconn(struct connection *conn) 157{ 158 159 close(conn->conn_fd); 160 bzero(conn, sizeof(*conn)); 161 free(conn); 162 finished++; 163} 164 165static void 166tcpp_client_handleconn(struct kevent *kev) 167{ 168 struct connection *conn; 169 struct iovec iov[2]; 170 ssize_t len, header_left; 171 172 conn = kev->udata; 173 if (conn->conn_magic != CONNECTION_MAGIC) 174 errx(-1, "tcpp_client_handleconn: magic"); 175 176 if (conn->conn_header_sent < sizeof(conn->conn_header)) { 177 header_left = sizeof(conn->conn_header) - 178 conn->conn_header_sent; 179 iov[0].iov_base = ((u_char *)&conn->conn_header) + 180 conn->conn_header_sent; 181 iov[0].iov_len = header_left; 182 iov[1].iov_base = buffer; 183 iov[1].iov_len = min(sizeof(buffer), payload_len); 184 len = writev(conn->conn_fd, iov, 2); 185 if (len < 0) { 186 tcpp_client_closeconn(conn); 187 err(-1, "tcpp_client_handleconn: header write"); 188 } 189 if (len == 0) { 190 tcpp_client_closeconn(conn); 191 errx(-1, "tcpp_client_handleconn: header write " 192 "premature EOF"); 193 } 194 if (len > header_left) { 195 conn->conn_data_sent += (len - header_left); 196 conn->conn_header_sent += header_left; 197 } else 198 conn->conn_header_sent += len; 199 } else { 200 len = write(conn->conn_fd, buffer, min(sizeof(buffer), 201 payload_len - conn->conn_data_sent)); 202 if (len < 0) { 203 tcpp_client_closeconn(conn); 204 err(-1, "tcpp_client_handleconn: data write"); 205 } 206 if (len == 0) { 207 tcpp_client_closeconn(conn); 208 errx(-1, "tcpp_client_handleconn: data write: " 209 "premature EOF"); 210 } 211 conn->conn_data_sent += len; 212 } 213 if (conn->conn_data_sent >= payload_len) { 214 /* 215 * All is well. 216 */ 217 tcpp_client_closeconn(conn); 218 } 219} 220 221static void 222tcpp_client_worker(int workernum) 223{ 224 struct kevent *kev_array; 225 int i, numevents, kev_bytes; 226#if defined(CPU_SETSIZE) && 0 227 cpu_set_t mask; 228 int ncpus; 229 size_t len; 230 231 if (Pflag) { 232 len = sizeof(ncpus); 233 if (sysctlbyname(SYSCTLNAME_CPUS, &ncpus, &len, NULL, 0) < 0) 234 err(-1, "sysctlbyname: %s", SYSCTLNAME_CPUS); 235 if (len != sizeof(ncpus)) 236 errx(-1, "sysctlbyname: %s: len %jd", SYSCTLNAME_CPUS, 237 (intmax_t)len); 238 239 CPU_ZERO(&mask); 240 CPU_SET(workernum % ncpus, &mask); 241 if (sched_setaffinity(0, CPU_SETSIZE, &mask) < 0) 242 err(-1, "sched_setaffinity"); 243 } 244#endif 245 setproctitle("tcpp_client %d", workernum); 246 247 /* 248 * Add the worker number to the remote port. 249 */ 250 remoteip.sin_port = htons(rflag + workernum); 251 252 kev_bytes = sizeof(*kev_array) * mflag; 253 kev_array = malloc(kev_bytes); 254 if (kev_array == NULL) 255 err(-1, "malloc"); 256 bzero(kev_array, kev_bytes); 257 258 kq = kqueue(); 259 if (kq < 0) 260 err(-1, "kqueue"); 261 262 while (finished < tflag) { 263 while ((started - finished < mflag) && (started < tflag)) 264 (void)tcpp_client_newconn(); 265 numevents = kevent(kq, NULL, 0, kev_array, mflag, NULL); 266 if (numevents < 0) 267 err(-1, "kevent"); 268 if (numevents > mflag) 269 errx(-1, "kevent: %d", numevents); 270 for (i = 0; i < numevents; i++) 271 tcpp_client_handleconn(&kev_array[i]); 272 } 273 /* printf("Worker %d done - %d finished\n", workernum, finished); */ 274} 275 276void 277tcpp_client(void) 278{ 279 struct timespec ts_start, ts_finish; 280 long cp_time_start[CPUSTATES], cp_time_finish[CPUSTATES]; 281 long ticks; 282 size_t size; 283 pid_t pid; 284 int i, failed, status; 285 286 if (bflag < sizeof(struct tcpp_header)) 287 errx(-1, "Can't use -b less than %zu\n", 288 sizeof(struct tcpp_header)); 289 payload_len = bflag - sizeof(struct tcpp_header); 290 291 pid_list = malloc(sizeof(*pid_list) * pflag); 292 if (pid_list == NULL) 293 err(-1, "malloc pid_list"); 294 bzero(pid_list, sizeof(*pid_list) * pflag); 295 296 /* 297 * Start workers. 298 */ 299 size = sizeof(cp_time_start); 300 if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_start, &size, NULL, 0) 301 < 0) 302 err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME); 303 if (clock_gettime(CLOCK_REALTIME, &ts_start) < 0) 304 err(-1, "clock_gettime"); 305 for (i = 0; i < pflag; i++) { 306 pid = fork(); 307 if (pid < 0) { 308 warn("fork"); 309 for (i = 0; i < pflag; i++) { 310 if (pid_list[i] != 0) 311 (void)kill(pid_list[i], SIGKILL); 312 } 313 exit(-1); 314 } 315 if (pid == 0) { 316 tcpp_client_worker(i); 317 exit(0); 318 } 319 pid_list[i] = pid; 320 } 321 322 /* 323 * GC workers. 324 */ 325 failed = 0; 326 for (i = 0; i < pflag; i++) { 327 if (pid_list[i] != 0) { 328 while (waitpid(pid_list[i], &status, 0) != pid_list[i]); 329 if (WEXITSTATUS(status) != 0) 330 failed = 1; 331 } 332 } 333 if (clock_gettime(CLOCK_REALTIME, &ts_finish) < 0) 334 err(-1, "clock_gettime"); 335 size = sizeof(cp_time_finish); 336 if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_finish, &size, NULL, 0) 337 < 0) 338 err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME); 339 timespecsub(&ts_finish, &ts_start); 340 341 if (failed) 342 errx(-1, "Too many errors"); 343 344 if (hflag) 345 printf("bytes,seconds,conn/s,Gb/s,user%%,nice%%,sys%%," 346 "intr%%,idle%%\n"); 347 348 /* 349 * Configuration parameters. 350 */ 351 printf("%jd,", bflag * tflag * pflag); 352 printf("%jd.%09jd,", (intmax_t)ts_finish.tv_sec, 353 (intmax_t)(ts_finish.tv_nsec)); 354 355 /* 356 * Effective transmit rates. 357 */ 358 printf("%f,", (double)(pflag * tflag)/ 359 (ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9)); 360 printf("%f,", (double)(bflag * tflag * pflag * 8) / 361 (ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9) * 1e-9); 362 363 /* 364 * CPU time (est). 365 */ 366 ticks = 0; 367 for (i = 0; i < CPUSTATES; i++) { 368 cp_time_finish[i] -= cp_time_start[i]; 369 ticks += cp_time_finish[i]; 370 } 371 printf("%0.02f,", (float)(100 * cp_time_finish[CP_USER]) / ticks); 372 printf("%0.02f,", (float)(100 * cp_time_finish[CP_NICE]) / ticks); 373 printf("%0.02f,", (float)(100 * cp_time_finish[CP_SYS]) / ticks); 374 printf("%0.02f,", (float)(100 * cp_time_finish[CP_INTR]) / ticks); 375 printf("%0.02f", (float)(100 * cp_time_finish[CP_IDLE]) / ticks); 376 printf("\n"); 377} 378