1189623Srwatson/*-
2189623Srwatson * Copyright (c) 2008-2009 Robert N. M. Watson
3189623Srwatson * All rights reserved.
4189623Srwatson *
5189623Srwatson * Redistribution and use in source and binary forms, with or without
6189623Srwatson * modification, are permitted provided that the following conditions
7189623Srwatson * are met:
8189623Srwatson * 1. Redistributions of source code must retain the above copyright
9189623Srwatson *    notice, this list of conditions and the following disclaimer.
10189623Srwatson * 2. Redistributions in binary form must reproduce the above copyright
11189623Srwatson *    notice, this list of conditions and the following disclaimer in the
12189623Srwatson *    documentation and/or other materials provided with the distribution.
13189623Srwatson *
14189623Srwatson * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15189623Srwatson * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16189623Srwatson * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17189623Srwatson * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18189623Srwatson * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19189623Srwatson * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20189623Srwatson * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21189623Srwatson * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22189623Srwatson * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23189623Srwatson * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24189623Srwatson * SUCH DAMAGE.
25189623Srwatson *
26189623Srwatson * $FreeBSD$
27189623Srwatson */
28189623Srwatson
29189623Srwatson#include <sys/types.h>
30189623Srwatson#include <sys/endian.h>
31189623Srwatson#include <sys/event.h>
32189623Srwatson#include <sys/resource.h>
33189623Srwatson#include <sys/sched.h>
34189623Srwatson#include <sys/socket.h>
35189623Srwatson#include <sys/sysctl.h>
36189623Srwatson#include <sys/time.h>
37189623Srwatson#include <sys/wait.h>
38189623Srwatson
39189623Srwatson#include <netinet/in.h>
40206972Srwatson#include <netinet/tcp.h>
41189623Srwatson
42189623Srwatson#include <err.h>
43189623Srwatson#include <fcntl.h>
44189623Srwatson#include <inttypes.h>
45189623Srwatson#include <signal.h>
46189623Srwatson#include <stdio.h>
47189623Srwatson#include <stdlib.h>
48189623Srwatson#include <string.h>
49189623Srwatson#include <unistd.h>
50189623Srwatson
51189623Srwatson#include "tcpp.h"
52189623Srwatson
53189623Srwatson/*
54189623Srwatson * Server side -- create a pool of processes, each listening on its own TCP
55189623Srwatson * port number for new connections.  The first 8 bytes of each connection
56189623Srwatson * will be a network byte order length, then there will be that number of
57189623Srwatson * bytes of data.  We use non-blocking sockets with kqueue to to avoid the
58189623Srwatson * overhead of threading or more than one process per processor, which makes
59189623Srwatson * things a bit awkward when dealing with data we care about.  As such, we
60189623Srwatson * read into a small character buffer which we then convert to a length once
61189623Srwatson * we have all the data.
62189623Srwatson */
63189623Srwatson#define	CONNECTION_MAGIC	0x6392af27
64189623Srwatsonstruct connection {
65189623Srwatson	uint32_t	conn_magic;		/* Just magic. */
66189623Srwatson	int		conn_fd;
67189623Srwatson	struct tcpp_header	conn_header;	/* Header buffer. */
68189623Srwatson	u_int		conn_header_len;	/* Bytes so far. */
69189623Srwatson	u_int64_t	conn_data_len;		/* How much to sink. */
70189623Srwatson	u_int64_t	conn_data_received;	/* How much so far. */
71189623Srwatson};
72189623Srwatson
73189623Srwatsonstatic pid_t			*pid_list;
74189623Srwatsonstatic int			 kq;
75189623Srwatson
76189623Srwatsonstatic struct connection *
77189623Srwatsontcpp_server_newconn(int listen_fd)
78189623Srwatson{
79189623Srwatson	struct connection *conn;
80189623Srwatson	struct kevent kev;
81189623Srwatson	int fd;
82189623Srwatson
83189623Srwatson	fd = accept(listen_fd, NULL, NULL);
84189623Srwatson	if (fd < 0) {
85189623Srwatson		warn("accept");
86189623Srwatson		return (NULL);
87189623Srwatson	}
88189623Srwatson
89189623Srwatson	if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0)
90189623Srwatson		err(-1, "fcntl");
91189623Srwatson
92189623Srwatson	conn = malloc(sizeof(*conn));
93189623Srwatson	if (conn == NULL)
94189623Srwatson		return (NULL);
95189623Srwatson	bzero(conn, sizeof(*conn));
96189623Srwatson	conn->conn_magic = CONNECTION_MAGIC;
97189623Srwatson	conn->conn_fd = fd;
98189623Srwatson
99189623Srwatson	/*
100189623Srwatson	 * Register to read on the socket, and set our conn pointer as the
101189623Srwatson	 * udata so we can find it quickly in the future.
102189623Srwatson	 */
103189623Srwatson	EV_SET(&kev, fd, EVFILT_READ, EV_ADD, 0, 0, conn);
104189623Srwatson	if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0)
105189623Srwatson		err(-1, "kevent");
106189623Srwatson
107189623Srwatson	return (conn);
108189623Srwatson}
109189623Srwatson
110189623Srwatsonstatic void
111189623Srwatsontcpp_server_closeconn(struct connection *conn)
112189623Srwatson{
113189623Srwatson
114189623Srwatson	/*
115189623Srwatson	 * Kqueue cleans up after itself once we close the socket, and since
116189623Srwatson	 * we are processing only one kevent at a time, we don't need to
117189623Srwatson	 * worry about watching out for future kevents referring to it.
118189623Srwatson	 *
119189623Srwatson	 * ... right?
120189623Srwatson	 */
121189623Srwatson	close(conn->conn_fd);
122189623Srwatson	bzero(conn, sizeof(*conn));
123189623Srwatson	free(conn);
124189623Srwatson}
125189623Srwatson
126189623Srwatsonstatic u_char buffer[256*1024];	/* Buffer in which to sink data. */
127189623Srwatsonstatic void
128189623Srwatsontcpp_server_handleconn(struct kevent *kev)
129189623Srwatson{
130189623Srwatson	struct connection *conn;
131189623Srwatson	ssize_t len;
132189623Srwatson
133189623Srwatson	conn = kev->udata;
134189623Srwatson	if (conn->conn_magic != CONNECTION_MAGIC)
135189623Srwatson		errx(-1, "tcpp_server_handleconn: magic");
136189623Srwatson
137189623Srwatson	if (conn->conn_header_len < sizeof(conn->conn_header)) {
138189623Srwatson		len = read(conn->conn_fd,
139189623Srwatson		    ((u_char *)&conn->conn_header) + conn->conn_header_len,
140189623Srwatson		    sizeof(conn->conn_header) - conn->conn_header_len);
141189623Srwatson		if (len < 0) {
142189623Srwatson			warn("tcpp_server_handleconn: header read");
143189623Srwatson			tcpp_server_closeconn(conn);
144189623Srwatson			return;
145189623Srwatson		}
146189623Srwatson		if (len == 0) {
147189623Srwatson			warnx("tcpp_server_handleconn: header premature eof");
148189623Srwatson			tcpp_server_closeconn(conn);
149189623Srwatson			return;
150189623Srwatson		}
151189623Srwatson		conn->conn_header_len += len;
152189623Srwatson		if (conn->conn_header_len == sizeof(conn->conn_header)) {
153189623Srwatson			tcpp_header_decode(&conn->conn_header);
154189623Srwatson			if (conn->conn_header.th_magic != TCPP_MAGIC) {
155189623Srwatson				warnx("tcpp_server_handleconn: bad magic");
156189623Srwatson				tcpp_server_closeconn(conn);
157189623Srwatson				return;
158189623Srwatson			}
159189623Srwatson		}
160189623Srwatson	} else {
161189623Srwatson		/*
162189623Srwatson		 * Drain up to a buffer from the connection, so that we pay
163189623Srwatson		 * attention to other connections too.
164189623Srwatson		 */
165189623Srwatson		len = read(conn->conn_fd, buffer, sizeof(buffer));
166189623Srwatson		if (len < 0) {
167189623Srwatson			warn("tcpp_server_handleconn: data bad read");
168189623Srwatson			tcpp_server_closeconn(conn);
169189623Srwatson			return;
170189623Srwatson		}
171189623Srwatson		if (len == 0 && conn->conn_data_received <
172189623Srwatson		    conn->conn_header.th_len) {
173189623Srwatson			warnx("tcpp_server_handleconn: data premature eof");
174189623Srwatson			tcpp_server_closeconn(conn);
175189623Srwatson			return;
176189623Srwatson		}
177189623Srwatson		conn->conn_data_received += len;
178189623Srwatson		if (conn->conn_data_received > conn->conn_header.th_len) {
179189623Srwatson			warnx("tcpp_server_handleconn: too much data");
180189623Srwatson			tcpp_server_closeconn(conn);
181189623Srwatson			return;
182189623Srwatson		}
183189623Srwatson		if (conn->conn_data_received == conn->conn_header.th_len) {
184189623Srwatson			/*
185189623Srwatson			 * All is well.
186189623Srwatson			 */
187189623Srwatson			tcpp_server_closeconn(conn);
188189623Srwatson			return;
189189623Srwatson		}
190189623Srwatson	}
191189623Srwatson}
192189623Srwatson
193189623Srwatsonstatic void
194189623Srwatsontcpp_server_worker(int workernum)
195189623Srwatson{
196189623Srwatson	int i, listen_sock, numevents;
197189623Srwatson	struct kevent kev, *kev_array;
198189623Srwatson	int kev_bytes;
199189623Srwatson#if defined(CPU_SETSIZE) && 0
200189623Srwatson	cpu_set_t mask;
201189623Srwatson	int ncpus;
202189623Srwatson	ssize_t len;
203189623Srwatson
204208859Srwatson	if (Pflag) {
205208859Srwatson		len = sizeof(ncpus);
206208859Srwatson		if (sysctlbyname(SYSCTLNAME_CPUS, &ncpus, &len, NULL, 0) < 0)
207208859Srwatson			err(-1, "sysctlbyname: %s", SYSCTLNAME_CPUS);
208208859Srwatson		if (len != sizeof(ncpus))
209208859Srwatson			errx(-1, "sysctlbyname: %s: len %jd", SYSCTLNAME_CPUS,
210208859Srwatson			    (intmax_t)len);
211189623Srwatson
212208859Srwatson		CPU_ZERO(&mask);
213208859Srwatson		CPU_SET(workernum % ncpus, &mask);
214208859Srwatson		if (sched_setaffinity(0, CPU_SETSIZE, &mask) < 0)
215208859Srwatson			err(-1, "sched_setaffinity");
216208859Srwatson	}
217189623Srwatson#endif
218189623Srwatson	setproctitle("tcpp_server %d", workernum);
219189623Srwatson
220189623Srwatson	/* Allow an extra kevent for the listen socket. */
221189623Srwatson	kev_bytes = sizeof(*kev_array) * (mflag + 1);
222189623Srwatson	kev_array = malloc(kev_bytes);
223189623Srwatson	if (kev_array == NULL)
224189623Srwatson		err(-1, "malloc");
225189623Srwatson	bzero(kev_array, kev_bytes);
226189623Srwatson
227189623Srwatson	/* XXXRW: Want to set and pin the CPU here. */
228189623Srwatson
229189623Srwatson	/*
230189623Srwatson	 * Add the worker number to the local port.
231189623Srwatson	 */
232189623Srwatson	localipbase.sin_port = htons(rflag + workernum);
233189623Srwatson
234189623Srwatson	listen_sock = socket(PF_INET, SOCK_STREAM, 0);
235189623Srwatson	if (listen_sock < 0)
236189623Srwatson		err(-1, "socket");
237189623Srwatson	i = 1;
238189623Srwatson	if (setsockopt(listen_sock, SOL_SOCKET, SO_NOSIGPIPE, &i, sizeof(i))
239189623Srwatson	    < 0)
240189623Srwatson		err(-1, "setsockopt");
241189623Srwatson	i = 1;
242189623Srwatson	if (setsockopt(listen_sock, SOL_SOCKET, SO_REUSEPORT, &i, sizeof(i))
243189623Srwatson	    < 0)
244189623Srwatson		err(-1, "setsockopt");
245206972Srwatson	i = 1;
246206972Srwatson	if (setsockopt(listen_sock, IPPROTO_TCP, TCP_NODELAY, &i, sizeof(i))
247206972Srwatson	    < 0)
248206972Srwatson		err(-1, "setsockopt");
249189623Srwatson	if (bind(listen_sock, (struct sockaddr *)&localipbase,
250189623Srwatson	    sizeof(localipbase)) < 0)
251189623Srwatson		err(-1, "bind");
252189623Srwatson	if (listen(listen_sock, 16384))
253189623Srwatson		err(-1, "listen");
254189623Srwatson	if (fcntl(listen_sock, F_SETFL, O_NONBLOCK) < 0)
255189623Srwatson		err(-1, "fcntl");
256189623Srwatson
257189623Srwatson	kq = kqueue();
258189623Srwatson	if (kq < 0)
259189623Srwatson		err(-1, "kqueue");
260189623Srwatson
261189623Srwatson	EV_SET(&kev, listen_sock, EVFILT_READ, EV_ADD, 0, 0, NULL);
262189623Srwatson	if (kevent(kq, &kev, 1, NULL, 0, NULL) < 0)
263189623Srwatson		err(-1, "kevent");
264189623Srwatson
265189623Srwatson	while ((numevents = kevent(kq, NULL, 0, kev_array, mflag + 1, NULL))
266189623Srwatson	    > 0) {
267189623Srwatson		for (i = 0; i < numevents; i++) {
268189623Srwatson			if (kev_array[i].ident == (u_int)listen_sock)
269189623Srwatson				(void)tcpp_server_newconn(listen_sock);
270189623Srwatson			else
271189623Srwatson				tcpp_server_handleconn(&kev_array[i]);
272189623Srwatson		}
273189623Srwatson	}
274189623Srwatson	printf("Worker %d done\n", workernum);
275189623Srwatson}
276189623Srwatson
277189623Srwatsonvoid
278189623Srwatsontcpp_server(void)
279189623Srwatson{
280208873Srwatson#if 0
281189623Srwatson	long cp_time_last[CPUSTATES], cp_time_now[CPUSTATES], ticks;
282189623Srwatson	size_t size;
283208873Srwatson#endif
284189623Srwatson	pid_t pid;
285189623Srwatson	int i;
286189623Srwatson
287189623Srwatson	pid_list = malloc(sizeof(*pid_list) * pflag);
288189623Srwatson	if (pid_list == NULL)
289189623Srwatson		err(-1, "malloc pid_list");
290189623Srwatson	bzero(pid_list, sizeof(*pid_list) * pflag);
291189623Srwatson
292189623Srwatson	/*
293189623Srwatson	 * Start workers.
294189623Srwatson	 */
295189623Srwatson	for (i = 0; i < pflag; i++) {
296189623Srwatson		pid = fork();
297189623Srwatson		if (pid < 0) {
298189623Srwatson			warn("fork");
299189623Srwatson			for (i = 0; i < pflag; i++) {
300189623Srwatson				if (pid_list[i] != 0)
301189623Srwatson					(void)kill(pid_list[i], SIGKILL);
302189623Srwatson			}
303189623Srwatson			exit(-1);
304189623Srwatson		}
305189623Srwatson		if (pid == 0) {
306189623Srwatson			tcpp_server_worker(i);
307189623Srwatson			exit(0);
308189623Srwatson		}
309189623Srwatson		pid_list[i] = pid;
310189623Srwatson	}
311189623Srwatson
312208873Srwatson#if 0
313189623Srwatson		size = sizeof(cp_time_last);
314189623Srwatson		if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_last, &size,
315189623Srwatson		    NULL, 0) < 0)
316189623Srwatson			err(-1, "sysctlbyname: %s", SYSCTLNAME_CPTIME);
317189623Srwatson		while (1) {
318189623Srwatson			sleep(10);
319189623Srwatson			size = sizeof(cp_time_last);
320189623Srwatson			if (sysctlbyname(SYSCTLNAME_CPTIME, &cp_time_now,
321189623Srwatson			    &size, NULL, 0) < 0)
322189623Srwatson				err(-1, "sysctlbyname: %s",
323189623Srwatson				    SYSCTLNAME_CPTIME);
324189623Srwatson			ticks = 0;
325189623Srwatson			for (i = 0; i < CPUSTATES; i++) {
326189623Srwatson				cp_time_last[i] = cp_time_now[i] -
327189623Srwatson				    cp_time_last[i];
328189623Srwatson				ticks += cp_time_last[i];
329189623Srwatson			}
330189623Srwatson			printf("user%% %lu nice%% %lu sys%% %lu intr%% %lu "
331189623Srwatson			    "idle%% %lu\n",
332189623Srwatson			    (100 * cp_time_last[CP_USER]) / ticks,
333189623Srwatson			    (100 * cp_time_last[CP_NICE]) / ticks,
334189623Srwatson			    (100 * cp_time_last[CP_SYS]) / ticks,
335189623Srwatson			    (100 * cp_time_last[CP_INTR]) / ticks,
336189623Srwatson			    (100 * cp_time_last[CP_IDLE]) / ticks);
337189623Srwatson			bcopy(cp_time_now, cp_time_last, sizeof(cp_time_last));
338189623Srwatson		}
339208873Srwatson#endif
340189623Srwatson
341189623Srwatson	/*
342189623Srwatson	 * GC workers.
343189623Srwatson	 */
344189623Srwatson	for (i = 0; i < pflag; i++) {
345189623Srwatson		if (pid_list[i] != 0) {
346189623Srwatson			while (waitpid(pid_list[i], NULL, 0) != pid_list[i]);
347189623Srwatson		}
348189623Srwatson	}
349189623Srwatson}
350