tcpp_client.c revision 206972
1189623Srwatson/*- 2189623Srwatson * Copyright (c) 2008-2009 Robert N. M. Watson 3189623Srwatson * All rights reserved. 4189623Srwatson * 5189623Srwatson * Redistribution and use in source and binary forms, with or without 6189623Srwatson * modification, are permitted provided that the following conditions 7189623Srwatson * are met: 8189623Srwatson * 1. Redistributions of source code must retain the above copyright 9189623Srwatson * notice, this list of conditions and the following disclaimer. 10189623Srwatson * 2. Redistributions in binary form must reproduce the above copyright 11189623Srwatson * notice, this list of conditions and the following disclaimer in the 12189623Srwatson * documentation and/or other materials provided with the distribution. 13189623Srwatson * 14189623Srwatson * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15189623Srwatson * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16189623Srwatson * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17189623Srwatson * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18189623Srwatson * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19189623Srwatson * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20189623Srwatson * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21189623Srwatson * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22189623Srwatson * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23189623Srwatson * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24189623Srwatson * SUCH DAMAGE. 25189623Srwatson * 26189623Srwatson * $FreeBSD: head/tools/tools/netrate/tcpp/tcpp_client.c 206972 2010-04-21 00:52:55Z rwatson $ 27189623Srwatson */ 28189623Srwatson 29189623Srwatson#include <sys/types.h> 30189623Srwatson#include <sys/event.h> 31189623Srwatson#include <sys/resource.h> 32189623Srwatson#include <sys/sched.h> 33189623Srwatson#include <sys/socket.h> 34189623Srwatson#include <sys/sysctl.h> 35189623Srwatson#include <sys/time.h> 36206972Srwatson#include <sys/uio.h> 37189623Srwatson#include <sys/wait.h> 38189623Srwatson 39189623Srwatson#include <netinet/in.h> 40206972Srwatson#include <netinet/tcp.h> 41189623Srwatson 42189623Srwatson#include <err.h> 43189623Srwatson#include <errno.h> 44189623Srwatson#include <fcntl.h> 45189623Srwatson#include <inttypes.h> 46189623Srwatson#include <signal.h> 47189623Srwatson#include <stdio.h> 48189623Srwatson#include <stdlib.h> 49189623Srwatson#include <string.h> 50189623Srwatson#include <unistd.h> 51189623Srwatson 52189623Srwatson#include "tcpp.h" 53189623Srwatson 54189623Srwatson#define min(x, y) (x < y ? x : y) 55189623Srwatson 56189623Srwatson#define timespecsub(vvp, uvp) \ 57189623Srwatson do { \ 58189623Srwatson (vvp)->tv_sec -= (uvp)->tv_sec; \ 59189623Srwatson (vvp)->tv_nsec -= (uvp)->tv_nsec; \ 60189623Srwatson if ((vvp)->tv_nsec < 0) { \ 61189623Srwatson (vvp)->tv_sec--; \ 62189623Srwatson (vvp)->tv_nsec += 1000000000; \ 63189623Srwatson } \ 64189623Srwatson } while (0) 65189623Srwatson 66189623Srwatson 67189623Srwatson/* 68189623Srwatson * Gist of each client worker: build up to mflag connections at a time, and 69189623Srwatson * pump data in to them somewhat fairly until tflag connections have been 70189623Srwatson * completed. 71189623Srwatson */ 72189623Srwatson#define CONNECTION_MAGIC 0x87a3f56e 73189623Srwatsonstruct connection { 74189623Srwatson uint32_t conn_magic; /* Just magic. */ 75189623Srwatson int conn_fd; 76189623Srwatson struct tcpp_header conn_header; /* Header buffer. */ 77189623Srwatson u_int conn_header_sent; /* Header bytes sent. */ 78189623Srwatson u_int64_t conn_data_sent; /* Data bytes sent.*/ 79189623Srwatson}; 80189623Srwatson 81189623Srwatsonstatic u_char buffer[256 * 1024]; /* Buffer to send. */ 82189623Srwatsonstatic pid_t *pid_list; 83189623Srwatsonstatic int kq; 84189623Srwatsonstatic int started; /* Number started so far. */ 85189623Srwatsonstatic int finished; /* Number finished so far. */ 86189623Srwatsonstatic int counter; /* IP number offset. */ 87206972Srwatsonstatic uint64_t payload_len; 88189623Srwatson 89189623Srwatsonstatic struct connection * 90189623Srwatsontcpp_client_newconn(void) 91189623Srwatson{ 92189623Srwatson struct sockaddr_in sin; 93189623Srwatson struct connection *conn; 94189623Srwatson struct kevent kev; 95189623Srwatson int fd, i; 96189623Srwatson 97189623Srwatson /* 98189623Srwatson * Spread load over available IPs, roating through them as we go. No 99189623Srwatson * attempt to localize IPs to particular workers. 100189623Srwatson */ 101189623Srwatson sin = localipbase; 102189623Srwatson sin.sin_addr.s_addr = htonl(ntohl(localipbase.sin_addr.s_addr) + 103189623Srwatson (counter++ % Mflag)); 104189623Srwatson 105189623Srwatson fd = socket(PF_INET, SOCK_STREAM, 0); 106189623Srwatson if (fd < 0) 107189623Srwatson err(-1, "socket"); 108189623Srwatson 109189623Srwatson if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) 110189623Srwatson err(-1, "fcntl"); 111189623Srwatson 112189623Srwatson i = 1; 113189623Srwatson if (setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i)) < 0) 114189623Srwatson err(-1, "setsockopt"); 115206972Srwatson i = 1; 116206972Srwatson if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i)) < 0) 117206972Srwatson err(-1, "setsockopt"); 118189623Srwatson#if 0 119189623Srwatson i = 1; 120189623Srwatson if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)) < 0) 121189623Srwatson err(-1, "setsockopt"); 122189623Srwatson#endif 123189623Srwatson 124189623Srwatson if (lflag) { 125189623Srwatson if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) < 0) 126189623Srwatson err(-1, "bind"); 127189623Srwatson } 128189623Srwatson 129189623Srwatson if (connect(fd, (struct sockaddr *)&remoteip, sizeof(remoteip)) < 0 && 130189623Srwatson errno != EINPROGRESS) 131189623Srwatson err(-1, "connect"); 132189623Srwatson 133189623Srwatson conn = malloc(sizeof(*conn)); 134189623Srwatson if (conn == NULL) 135189623Srwatson return (NULL); 136189623Srwatson bzero(conn, sizeof(*conn)); 137189623Srwatson conn->conn_magic = CONNECTION_MAGIC; 138189623Srwatson conn->conn_fd = fd; 139189623Srwatson conn->conn_header.th_magic = TCPP_MAGIC; 140206972Srwatson conn->conn_header.th_len = payload_len; 141189623Srwatson tcpp_header_encode(&conn->conn_header); 142189623Srwatson 143189623Srwatson EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, conn); 144189623Srwatson if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0) 145189623Srwatson err(-1, "newconn kevent"); 146189623Srwatson 147189623Srwatson started++; 148189623Srwatson return (conn); 149189623Srwatson} 150189623Srwatson 151189623Srwatsonstatic void 152189623Srwatsontcpp_client_closeconn(struct connection *conn) 153189623Srwatson{ 154189623Srwatson 155189623Srwatson close(conn->conn_fd); 156189623Srwatson bzero(conn, sizeof(*conn)); 157189623Srwatson free(conn); 158189623Srwatson finished++; 159189623Srwatson} 160189623Srwatson 161189623Srwatsonstatic void 162189623Srwatsontcpp_client_handleconn(struct kevent *kev) 163189623Srwatson{ 164189623Srwatson struct connection *conn; 165206972Srwatson struct iovec iov[2]; 166206972Srwatson ssize_t len, header_left; 167189623Srwatson 168189623Srwatson conn = kev->udata; 169189623Srwatson if (conn->conn_magic != CONNECTION_MAGIC) 170189623Srwatson errx(-1, "tcpp_client_handleconn: magic"); 171189623Srwatson 172189623Srwatson if (conn->conn_header_sent < sizeof(conn->conn_header)) { 173206972Srwatson header_left = sizeof(conn->conn_header) - 174206972Srwatson conn->conn_header_sent; 175206972Srwatson iov[0].iov_base = ((u_char *)&conn->conn_header) + 176206972Srwatson conn->conn_header_sent; 177206972Srwatson iov[0].iov_len = header_left; 178206972Srwatson iov[1].iov_base = buffer; 179206972Srwatson iov[1].iov_len = min(sizeof(buffer), payload_len); 180206972Srwatson len = writev(conn->conn_fd, iov, 2); 181189623Srwatson if (len < 0) { 182189623Srwatson tcpp_client_closeconn(conn); 183189623Srwatson err(-1, "tcpp_client_handleconn: header write"); 184189623Srwatson } 185189623Srwatson if (len == 0) { 186189623Srwatson tcpp_client_closeconn(conn); 187189623Srwatson errx(-1, "tcpp_client_handleconn: header write " 188189623Srwatson "premature EOF"); 189189623Srwatson } 190206972Srwatson if (len > header_left) { 191206972Srwatson conn->conn_data_sent += (len - header_left); 192206972Srwatson conn->conn_header_sent += header_left; 193206972Srwatson } else 194206972Srwatson conn->conn_header_sent += len; 195189623Srwatson } else { 196189623Srwatson len = write(conn->conn_fd, buffer, min(sizeof(buffer), 197206972Srwatson payload_len - conn->conn_data_sent)); 198189623Srwatson if (len < 0) { 199189623Srwatson tcpp_client_closeconn(conn); 200189623Srwatson err(-1, "tcpp_client_handleconn: data write"); 201189623Srwatson } 202189623Srwatson if (len == 0) { 203189623Srwatson tcpp_client_closeconn(conn); 204189623Srwatson errx(-1, "tcpp_client_handleconn: data write: " 205189623Srwatson "premature EOF"); 206189623Srwatson } 207189623Srwatson conn->conn_data_sent += len; 208189623Srwatson } 209206972Srwatson if (conn->conn_data_sent >= payload_len) { 210206972Srwatson /* 211206972Srwatson * All is well. 212206972Srwatson */ 213206972Srwatson tcpp_client_closeconn(conn); 214206972Srwatson } 215189623Srwatson} 216189623Srwatson 217189623Srwatsonstatic void 218189623Srwatsontcpp_client_worker(int workernum) 219189623Srwatson{ 220189623Srwatson struct kevent *kev_array; 221189623Srwatson int i, numevents, kev_bytes; 222189623Srwatson#if defined(CPU_SETSIZE) && 0 223189623Srwatson cpu_set_t mask; 224189623Srwatson int ncpus; 225189623Srwatson size_t len; 226189623Srwatson 227189623Srwatson len = sizeof(ncpus); 228189623Srwatson if (sysctlbyname(SYSCTLNAME_CPUS, &ncpus, &len, NULL, 0) < 0) 229189623Srwatson err(-1, "sysctlbyname: %s", SYSCTLNAME_CPUS); 230189623Srwatson if (len != sizeof(ncpus)) 231189623Srwatson errx(-1, "sysctlbyname: %s: len %jd", SYSCTLNAME_CPUS, 232189623Srwatson (intmax_t)len); 233189623Srwatson 234189623Srwatson CPU_ZERO(&mask); 235189623Srwatson CPU_SET(workernum % ncpus, &mask); 236189623Srwatson if (sched_setaffinity(0, CPU_SETSIZE, &mask) < 0) 237189623Srwatson err(-1, "sched_setaffinity"); 238189623Srwatson#endif 239189623Srwatson setproctitle("tcpp_client %d", workernum); 240189623Srwatson 241189623Srwatson /* 242189623Srwatson * Add the worker number to the remote port. 243189623Srwatson */ 244189623Srwatson remoteip.sin_port = htons(rflag + workernum); 245189623Srwatson 246189623Srwatson kev_bytes = sizeof(*kev_array) * mflag; 247189623Srwatson kev_array = malloc(kev_bytes); 248189623Srwatson if (kev_array == NULL) 249189623Srwatson err(-1, "malloc"); 250189623Srwatson bzero(kev_array, kev_bytes); 251189623Srwatson 252189623Srwatson kq = kqueue(); 253189623Srwatson if (kq < 0) 254189623Srwatson err(-1, "kqueue"); 255189623Srwatson 256189623Srwatson while (finished < tflag) { 257189623Srwatson while ((started - finished < mflag) && (started < tflag)) 258189623Srwatson (void)tcpp_client_newconn(); 259189623Srwatson numevents = kevent(kq, NULL, 0, kev_array, mflag, NULL); 260189623Srwatson if (numevents < 0) 261189623Srwatson err(-1, "kevent"); 262189623Srwatson if (numevents > mflag) 263189623Srwatson errx(-1, "kevent: %d", numevents); 264189623Srwatson for (i = 0; i < numevents; i++) 265189623Srwatson tcpp_client_handleconn(&kev_array[i]); 266189623Srwatson } 267189623Srwatson /* printf("Worker %d done - %d finished\n", workernum, finished); */ 268189623Srwatson} 269189623Srwatson 270189623Srwatsonvoid 271189623Srwatsontcpp_client(void) 272189623Srwatson{ 273189623Srwatson struct timespec ts_start, ts_finish; 274189623Srwatson long cp_time_start[CPUSTATES], cp_time_finish[CPUSTATES]; 275189623Srwatson long ticks; 276189623Srwatson size_t size; 277189623Srwatson pid_t pid; 278189623Srwatson int i, failed, status; 279189623Srwatson 280206972Srwatson if (bflag < sizeof(struct tcpp_header)) 281206972Srwatson errx(-1, "Can't use -b less than %zu\n", 282206972Srwatson sizeof(struct tcpp_header)); 283206972Srwatson payload_len = bflag - sizeof(struct tcpp_header); 284206972Srwatson 285189623Srwatson pid_list = malloc(sizeof(*pid_list) * pflag); 286189623Srwatson if (pid_list == NULL) 287189623Srwatson err(-1, "malloc pid_list"); 288189623Srwatson bzero(pid_list, sizeof(*pid_list) * pflag); 289189623Srwatson 290189623Srwatson /* 291189623Srwatson * Start workers. 292189623Srwatson */ 293189623Srwatson size = sizeof(cp_time_start); 294189623Srwatson if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_start, &size, NULL, 0) 295189623Srwatson < 0) 296189623Srwatson err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME); 297189623Srwatson if (clock_gettime(CLOCK_REALTIME, &ts_start) < 0) 298189623Srwatson err(-1, "clock_gettime"); 299189623Srwatson for (i = 0; i < pflag; i++) { 300189623Srwatson pid = fork(); 301189623Srwatson if (pid < 0) { 302189623Srwatson warn("fork"); 303189623Srwatson for (i = 0; i < pflag; i++) { 304189623Srwatson if (pid_list[i] != 0) 305189623Srwatson (void)kill(pid_list[i], SIGKILL); 306189623Srwatson } 307189623Srwatson exit(-1); 308189623Srwatson } 309189623Srwatson if (pid == 0) { 310189623Srwatson tcpp_client_worker(i); 311189623Srwatson exit(0); 312189623Srwatson } 313189623Srwatson pid_list[i] = pid; 314189623Srwatson } 315189623Srwatson 316189623Srwatson /* 317189623Srwatson * GC workers. 318189623Srwatson */ 319189623Srwatson failed = 0; 320189623Srwatson for (i = 0; i < pflag; i++) { 321189623Srwatson if (pid_list[i] != 0) { 322189623Srwatson while (waitpid(pid_list[i], &status, 0) != pid_list[i]); 323189623Srwatson if (WEXITSTATUS(status) != 0) 324189623Srwatson failed = 1; 325189623Srwatson } 326189623Srwatson } 327189623Srwatson if (clock_gettime(CLOCK_REALTIME, &ts_finish) < 0) 328189623Srwatson err(-1, "clock_gettime"); 329189623Srwatson size = sizeof(cp_time_finish); 330189623Srwatson if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_finish, &size, NULL, 0) 331189623Srwatson < 0) 332189623Srwatson err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME); 333189623Srwatson timespecsub(&ts_finish, &ts_start); 334189623Srwatson 335189623Srwatson if (failed) 336189623Srwatson errx(-1, "Too many errors"); 337189623Srwatson 338189623Srwatson printf("%jd bytes transferred in %jd.%09jd seconds\n", 339189623Srwatson (bflag * tflag * pflag), (intmax_t)ts_finish.tv_sec, 340189623Srwatson (intmax_t)(ts_finish.tv_nsec)); 341189623Srwatson 342189623Srwatson if (Tflag) 343189623Srwatson printf("%d procs ", pflag); 344189623Srwatson if (Cflag) { 345189623Srwatson printf("%f cps%s", (double)(pflag * tflag)/ 346189623Srwatson (ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9), 347189623Srwatson Tflag ? " " : "\n"); 348189623Srwatson } else { 349189623Srwatson printf("%f Gbps%s", (double)(bflag * tflag * pflag * 8) / 350189623Srwatson (ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9) * 1e-9, 351189623Srwatson Tflag ? " " : "\n"); 352189623Srwatson } 353189623Srwatson if (Tflag) { 354189623Srwatson ticks = 0; 355189623Srwatson for (i = 0; i < CPUSTATES; i++) { 356189623Srwatson cp_time_finish[i] -= cp_time_start[i]; 357189623Srwatson ticks += cp_time_finish[i]; 358189623Srwatson } 359189623Srwatson printf("user%% %lu nice%% %lu sys%% %lu intr%% %lu " 360189623Srwatson "idle%% %lu\n", 361189623Srwatson (100 * cp_time_finish[CP_USER]) / ticks, 362189623Srwatson (100 * cp_time_finish[CP_NICE]) / ticks, 363189623Srwatson (100 * cp_time_finish[CP_SYS]) / ticks, 364189623Srwatson (100 * cp_time_finish[CP_INTR]) / ticks, 365189623Srwatson (100 * cp_time_finish[CP_IDLE]) / ticks); 366189623Srwatson } 367189623Srwatson} 368