tcpp_client.c revision 206972
1189623Srwatson/*-
2189623Srwatson * Copyright (c) 2008-2009 Robert N. M. Watson
3189623Srwatson * All rights reserved.
4189623Srwatson *
5189623Srwatson * Redistribution and use in source and binary forms, with or without
6189623Srwatson * modification, are permitted provided that the following conditions
7189623Srwatson * are met:
8189623Srwatson * 1. Redistributions of source code must retain the above copyright
9189623Srwatson *    notice, this list of conditions and the following disclaimer.
10189623Srwatson * 2. Redistributions in binary form must reproduce the above copyright
11189623Srwatson *    notice, this list of conditions and the following disclaimer in the
12189623Srwatson *    documentation and/or other materials provided with the distribution.
13189623Srwatson *
14189623Srwatson * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15189623Srwatson * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16189623Srwatson * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17189623Srwatson * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18189623Srwatson * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19189623Srwatson * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20189623Srwatson * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21189623Srwatson * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22189623Srwatson * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23189623Srwatson * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24189623Srwatson * SUCH DAMAGE.
25189623Srwatson *
26189623Srwatson * $FreeBSD: head/tools/tools/netrate/tcpp/tcpp_client.c 206972 2010-04-21 00:52:55Z rwatson $
27189623Srwatson */
28189623Srwatson
29189623Srwatson#include <sys/types.h>
30189623Srwatson#include <sys/event.h>
31189623Srwatson#include <sys/resource.h>
32189623Srwatson#include <sys/sched.h>
33189623Srwatson#include <sys/socket.h>
34189623Srwatson#include <sys/sysctl.h>
35189623Srwatson#include <sys/time.h>
36206972Srwatson#include <sys/uio.h>
37189623Srwatson#include <sys/wait.h>
38189623Srwatson
39189623Srwatson#include <netinet/in.h>
40206972Srwatson#include <netinet/tcp.h>
41189623Srwatson
42189623Srwatson#include <err.h>
43189623Srwatson#include <errno.h>
44189623Srwatson#include <fcntl.h>
45189623Srwatson#include <inttypes.h>
46189623Srwatson#include <signal.h>
47189623Srwatson#include <stdio.h>
48189623Srwatson#include <stdlib.h>
49189623Srwatson#include <string.h>
50189623Srwatson#include <unistd.h>
51189623Srwatson
52189623Srwatson#include "tcpp.h"
53189623Srwatson
54189623Srwatson#define	min(x, y)	(x < y ? x : y)
55189623Srwatson
56189623Srwatson#define timespecsub(vvp, uvp)						\
57189623Srwatson	do {								\
58189623Srwatson		(vvp)->tv_sec -= (uvp)->tv_sec;				\
59189623Srwatson		(vvp)->tv_nsec -= (uvp)->tv_nsec;			\
60189623Srwatson		if ((vvp)->tv_nsec < 0) {				\
61189623Srwatson			(vvp)->tv_sec--;				\
62189623Srwatson			(vvp)->tv_nsec += 1000000000;			\
63189623Srwatson		}							\
64189623Srwatson	} while (0)
65189623Srwatson
66189623Srwatson
67189623Srwatson/*
68189623Srwatson * Gist of each client worker: build up to mflag connections at a time, and
69189623Srwatson * pump data in to them somewhat fairly until tflag connections have been
70189623Srwatson * completed.
71189623Srwatson */
72189623Srwatson#define	CONNECTION_MAGIC	0x87a3f56e
73189623Srwatsonstruct connection {
74189623Srwatson	uint32_t	conn_magic;		/* Just magic. */
75189623Srwatson	int		conn_fd;
76189623Srwatson	struct tcpp_header	conn_header;	/* Header buffer. */
77189623Srwatson	u_int		conn_header_sent;	/* Header bytes sent. */
78189623Srwatson	u_int64_t	conn_data_sent;		/* Data bytes sent.*/
79189623Srwatson};
80189623Srwatson
81189623Srwatsonstatic u_char			 buffer[256 * 1024];	/* Buffer to send. */
82189623Srwatsonstatic pid_t			*pid_list;
83189623Srwatsonstatic int			 kq;
84189623Srwatsonstatic int			 started;	/* Number started so far. */
85189623Srwatsonstatic int			 finished;	/* Number finished so far. */
86189623Srwatsonstatic int			 counter;	/* IP number offset. */
87206972Srwatsonstatic uint64_t			 payload_len;
88189623Srwatson
89189623Srwatsonstatic struct connection *
90189623Srwatsontcpp_client_newconn(void)
91189623Srwatson{
92189623Srwatson	struct sockaddr_in sin;
93189623Srwatson	struct connection *conn;
94189623Srwatson	struct kevent kev;
95189623Srwatson	int fd, i;
96189623Srwatson
97189623Srwatson	/*
98189623Srwatson	 * Spread load over available IPs, roating through them as we go.  No
99189623Srwatson	 * attempt to localize IPs to particular workers.
100189623Srwatson	 */
101189623Srwatson	sin = localipbase;
102189623Srwatson	sin.sin_addr.s_addr = htonl(ntohl(localipbase.sin_addr.s_addr) +
103189623Srwatson	    (counter++ % Mflag));
104189623Srwatson
105189623Srwatson	fd = socket(PF_INET, SOCK_STREAM, 0);
106189623Srwatson	if (fd < 0)
107189623Srwatson		err(-1, "socket");
108189623Srwatson
109189623Srwatson	if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0)
110189623Srwatson		err(-1, "fcntl");
111189623Srwatson
112189623Srwatson	i = 1;
113189623Srwatson	if (setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i)) < 0)
114189623Srwatson		err(-1, "setsockopt");
115206972Srwatson	i = 1;
116206972Srwatson	if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i)) < 0)
117206972Srwatson		err(-1, "setsockopt");
118189623Srwatson#if 0
119189623Srwatson	i = 1;
120189623Srwatson	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &i, sizeof(i)) < 0)
121189623Srwatson		err(-1, "setsockopt");
122189623Srwatson#endif
123189623Srwatson
124189623Srwatson	if (lflag) {
125189623Srwatson		if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) < 0)
126189623Srwatson			err(-1, "bind");
127189623Srwatson	}
128189623Srwatson
129189623Srwatson	if (connect(fd, (struct sockaddr *)&remoteip, sizeof(remoteip)) < 0 &&
130189623Srwatson	    errno != EINPROGRESS)
131189623Srwatson		err(-1, "connect");
132189623Srwatson
133189623Srwatson	conn = malloc(sizeof(*conn));
134189623Srwatson	if (conn == NULL)
135189623Srwatson		return (NULL);
136189623Srwatson	bzero(conn, sizeof(*conn));
137189623Srwatson	conn->conn_magic = CONNECTION_MAGIC;
138189623Srwatson	conn->conn_fd = fd;
139189623Srwatson	conn->conn_header.th_magic = TCPP_MAGIC;
140206972Srwatson	conn->conn_header.th_len = payload_len;
141189623Srwatson	tcpp_header_encode(&conn->conn_header);
142189623Srwatson
143189623Srwatson	EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, conn);
144189623Srwatson	if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0)
145189623Srwatson		err(-1, "newconn kevent");
146189623Srwatson
147189623Srwatson	started++;
148189623Srwatson	return (conn);
149189623Srwatson}
150189623Srwatson
151189623Srwatsonstatic void
152189623Srwatsontcpp_client_closeconn(struct connection *conn)
153189623Srwatson{
154189623Srwatson
155189623Srwatson	close(conn->conn_fd);
156189623Srwatson	bzero(conn, sizeof(*conn));
157189623Srwatson	free(conn);
158189623Srwatson	finished++;
159189623Srwatson}
160189623Srwatson
161189623Srwatsonstatic void
162189623Srwatsontcpp_client_handleconn(struct kevent *kev)
163189623Srwatson{
164189623Srwatson	struct connection *conn;
165206972Srwatson	struct iovec iov[2];
166206972Srwatson	ssize_t len, header_left;
167189623Srwatson
168189623Srwatson	conn = kev->udata;
169189623Srwatson	if (conn->conn_magic != CONNECTION_MAGIC)
170189623Srwatson		errx(-1, "tcpp_client_handleconn: magic");
171189623Srwatson
172189623Srwatson	if (conn->conn_header_sent < sizeof(conn->conn_header)) {
173206972Srwatson		header_left = sizeof(conn->conn_header) -
174206972Srwatson		    conn->conn_header_sent;
175206972Srwatson		iov[0].iov_base = ((u_char *)&conn->conn_header) +
176206972Srwatson		    conn->conn_header_sent;
177206972Srwatson		iov[0].iov_len = header_left;
178206972Srwatson		iov[1].iov_base = buffer;
179206972Srwatson		iov[1].iov_len = min(sizeof(buffer), payload_len);
180206972Srwatson		len = writev(conn->conn_fd, iov, 2);
181189623Srwatson		if (len < 0) {
182189623Srwatson			tcpp_client_closeconn(conn);
183189623Srwatson			err(-1, "tcpp_client_handleconn: header write");
184189623Srwatson		}
185189623Srwatson		if (len == 0) {
186189623Srwatson			tcpp_client_closeconn(conn);
187189623Srwatson			errx(-1, "tcpp_client_handleconn: header write "
188189623Srwatson			    "premature EOF");
189189623Srwatson		}
190206972Srwatson		if (len > header_left) {
191206972Srwatson			conn->conn_data_sent += (len - header_left);
192206972Srwatson			conn->conn_header_sent += header_left;
193206972Srwatson		} else
194206972Srwatson			conn->conn_header_sent += len;
195189623Srwatson	} else {
196189623Srwatson		len = write(conn->conn_fd, buffer, min(sizeof(buffer),
197206972Srwatson		    payload_len - conn->conn_data_sent));
198189623Srwatson		if (len < 0) {
199189623Srwatson			tcpp_client_closeconn(conn);
200189623Srwatson			err(-1, "tcpp_client_handleconn: data write");
201189623Srwatson		}
202189623Srwatson		if (len == 0) {
203189623Srwatson			tcpp_client_closeconn(conn);
204189623Srwatson			errx(-1, "tcpp_client_handleconn: data write: "
205189623Srwatson			    "premature EOF");
206189623Srwatson		}
207189623Srwatson		conn->conn_data_sent += len;
208189623Srwatson	}
209206972Srwatson	if (conn->conn_data_sent >= payload_len) {
210206972Srwatson		/*
211206972Srwatson		 * All is well.
212206972Srwatson		 */
213206972Srwatson		tcpp_client_closeconn(conn);
214206972Srwatson	}
215189623Srwatson}
216189623Srwatson
217189623Srwatsonstatic void
218189623Srwatsontcpp_client_worker(int workernum)
219189623Srwatson{
220189623Srwatson	struct kevent *kev_array;
221189623Srwatson	int i, numevents, kev_bytes;
222189623Srwatson#if defined(CPU_SETSIZE) && 0
223189623Srwatson	cpu_set_t mask;
224189623Srwatson	int ncpus;
225189623Srwatson	size_t len;
226189623Srwatson
227189623Srwatson	len = sizeof(ncpus);
228189623Srwatson	if (sysctlbyname(SYSCTLNAME_CPUS, &ncpus, &len, NULL, 0) < 0)
229189623Srwatson		err(-1, "sysctlbyname: %s", SYSCTLNAME_CPUS);
230189623Srwatson	if (len != sizeof(ncpus))
231189623Srwatson		errx(-1, "sysctlbyname: %s: len %jd", SYSCTLNAME_CPUS,
232189623Srwatson		    (intmax_t)len);
233189623Srwatson
234189623Srwatson	CPU_ZERO(&mask);
235189623Srwatson	CPU_SET(workernum % ncpus, &mask);
236189623Srwatson	if (sched_setaffinity(0, CPU_SETSIZE, &mask) < 0)
237189623Srwatson		err(-1, "sched_setaffinity");
238189623Srwatson#endif
239189623Srwatson	setproctitle("tcpp_client %d", workernum);
240189623Srwatson
241189623Srwatson	/*
242189623Srwatson	 * Add the worker number to the remote port.
243189623Srwatson	 */
244189623Srwatson	remoteip.sin_port = htons(rflag + workernum);
245189623Srwatson
246189623Srwatson	kev_bytes = sizeof(*kev_array) * mflag;
247189623Srwatson	kev_array = malloc(kev_bytes);
248189623Srwatson	if (kev_array == NULL)
249189623Srwatson		err(-1, "malloc");
250189623Srwatson	bzero(kev_array, kev_bytes);
251189623Srwatson
252189623Srwatson	kq = kqueue();
253189623Srwatson	if (kq < 0)
254189623Srwatson		err(-1, "kqueue");
255189623Srwatson
256189623Srwatson	while (finished < tflag) {
257189623Srwatson		while ((started - finished < mflag) && (started < tflag))
258189623Srwatson			(void)tcpp_client_newconn();
259189623Srwatson		numevents = kevent(kq, NULL, 0, kev_array, mflag, NULL);
260189623Srwatson		if (numevents < 0)
261189623Srwatson			err(-1, "kevent");
262189623Srwatson		if (numevents > mflag)
263189623Srwatson			errx(-1, "kevent: %d", numevents);
264189623Srwatson		for (i = 0; i < numevents; i++)
265189623Srwatson			tcpp_client_handleconn(&kev_array[i]);
266189623Srwatson	}
267189623Srwatson	/* printf("Worker %d done - %d finished\n", workernum, finished); */
268189623Srwatson}
269189623Srwatson
270189623Srwatsonvoid
271189623Srwatsontcpp_client(void)
272189623Srwatson{
273189623Srwatson	struct timespec ts_start, ts_finish;
274189623Srwatson	long cp_time_start[CPUSTATES], cp_time_finish[CPUSTATES];
275189623Srwatson	long ticks;
276189623Srwatson	size_t size;
277189623Srwatson	pid_t pid;
278189623Srwatson	int i, failed, status;
279189623Srwatson
280206972Srwatson	if (bflag < sizeof(struct tcpp_header))
281206972Srwatson		errx(-1, "Can't use -b less than %zu\n",
282206972Srwatson		   sizeof(struct tcpp_header));
283206972Srwatson	payload_len = bflag - sizeof(struct tcpp_header);
284206972Srwatson
285189623Srwatson	pid_list = malloc(sizeof(*pid_list) * pflag);
286189623Srwatson	if (pid_list == NULL)
287189623Srwatson		err(-1, "malloc pid_list");
288189623Srwatson	bzero(pid_list, sizeof(*pid_list) * pflag);
289189623Srwatson
290189623Srwatson	/*
291189623Srwatson	 * Start workers.
292189623Srwatson	 */
293189623Srwatson	size = sizeof(cp_time_start);
294189623Srwatson	if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_start, &size, NULL, 0)
295189623Srwatson	    < 0)
296189623Srwatson		err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME);
297189623Srwatson	if (clock_gettime(CLOCK_REALTIME, &ts_start) < 0)
298189623Srwatson		err(-1, "clock_gettime");
299189623Srwatson	for (i = 0; i < pflag; i++) {
300189623Srwatson		pid = fork();
301189623Srwatson		if (pid < 0) {
302189623Srwatson			warn("fork");
303189623Srwatson			for (i = 0; i < pflag; i++) {
304189623Srwatson				if (pid_list[i] != 0)
305189623Srwatson					(void)kill(pid_list[i], SIGKILL);
306189623Srwatson			}
307189623Srwatson			exit(-1);
308189623Srwatson		}
309189623Srwatson		if (pid == 0) {
310189623Srwatson			tcpp_client_worker(i);
311189623Srwatson			exit(0);
312189623Srwatson		}
313189623Srwatson		pid_list[i] = pid;
314189623Srwatson	}
315189623Srwatson
316189623Srwatson	/*
317189623Srwatson	 * GC workers.
318189623Srwatson	 */
319189623Srwatson	failed = 0;
320189623Srwatson	for (i = 0; i < pflag; i++) {
321189623Srwatson		if (pid_list[i] != 0) {
322189623Srwatson			while (waitpid(pid_list[i], &status, 0) != pid_list[i]);
323189623Srwatson			if (WEXITSTATUS(status) != 0)
324189623Srwatson				failed = 1;
325189623Srwatson		}
326189623Srwatson	}
327189623Srwatson	if (clock_gettime(CLOCK_REALTIME, &ts_finish) < 0)
328189623Srwatson		err(-1, "clock_gettime");
329189623Srwatson	size = sizeof(cp_time_finish);
330189623Srwatson	if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_finish, &size, NULL, 0)
331189623Srwatson	    < 0)
332189623Srwatson		err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME);
333189623Srwatson	timespecsub(&ts_finish, &ts_start);
334189623Srwatson
335189623Srwatson	if (failed)
336189623Srwatson		errx(-1, "Too many errors");
337189623Srwatson
338189623Srwatson	printf("%jd bytes transferred in %jd.%09jd seconds\n",
339189623Srwatson	    (bflag * tflag * pflag), (intmax_t)ts_finish.tv_sec,
340189623Srwatson	    (intmax_t)(ts_finish.tv_nsec));
341189623Srwatson
342189623Srwatson	if (Tflag)
343189623Srwatson		printf("%d procs ", pflag);
344189623Srwatson	if (Cflag) {
345189623Srwatson		printf("%f cps%s", (double)(pflag * tflag)/
346189623Srwatson		    (ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9),
347189623Srwatson		    Tflag ? " " : "\n");
348189623Srwatson	} else {
349189623Srwatson		printf("%f Gbps%s", (double)(bflag * tflag * pflag * 8) /
350189623Srwatson		    (ts_finish.tv_sec + ts_finish.tv_nsec * 1e-9) * 1e-9,
351189623Srwatson		    Tflag ? " " : "\n");
352189623Srwatson	}
353189623Srwatson	if (Tflag) {
354189623Srwatson		ticks = 0;
355189623Srwatson		for (i = 0; i < CPUSTATES; i++) {
356189623Srwatson			cp_time_finish[i] -= cp_time_start[i];
357189623Srwatson			ticks += cp_time_finish[i];
358189623Srwatson		}
359189623Srwatson		printf("user%% %lu nice%% %lu sys%% %lu intr%% %lu "
360189623Srwatson		    "idle%% %lu\n",
361189623Srwatson		    (100 * cp_time_finish[CP_USER]) / ticks,
362189623Srwatson		    (100 * cp_time_finish[CP_NICE]) / ticks,
363189623Srwatson		    (100 * cp_time_finish[CP_SYS]) / ticks,
364189623Srwatson		    (100 * cp_time_finish[CP_INTR]) / ticks,
365189623Srwatson		    (100 * cp_time_finish[CP_IDLE]) / ticks);
366189623Srwatson	}
367189623Srwatson}
368