1/*++
2/* NAME
3/*	qmqp-sink 1
4/* SUMMARY
5/*	multi-threaded QMQP test server
6/* SYNOPSIS
7/* .fi
8/*	\fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR]
9/*	[\fBinet:\fR][\fIhost\fR]:\fIport\fR \fIbacklog\fR
10/*
11/*	\fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR]
12/*	\fBunix:\fR\fIpathname\fR \fIbacklog\fR
13/* DESCRIPTION
14/*	\fBqmqp-sink\fR listens on the named host (or address) and port.
15/*	It receives messages from the network and throws them away.
16/*	The purpose is to measure QMQP client performance, not protocol
17/*	compliance.
18/*	Connections can be accepted on IPv4 or IPv6 endpoints, or on
19/*	UNIX-domain sockets.
20/*	IPv4 and IPv6 are the default.
21/*	This program is the complement of the \fBqmqp-source\fR(1) program.
22/*
23/*	Note: this is an unsupported test program. No attempt is made
24/*	to maintain compatibility between successive versions.
25/*
26/*	Arguments:
27/* .IP \fB-4\fR
28/*	Support IPv4 only. This option has no effect when
29/*	Postfix is built without IPv6 support.
30/* .IP \fB-6\fR
31/*	Support IPv6 only. This option is not available when
32/*	Postfix is built without IPv6 support.
33/* .IP \fB-c\fR
34/*	Display a running counter that is updated whenever a delivery
35/*	is completed.
36/* .IP \fB-v\fR
37/*	Increase verbosity. Specify \fB-v -v\fR to see some of the QMQP
38/*	conversation.
39/* .IP "\fB-x \fItime\fR
40/*	Terminate after \fItime\fR seconds. This is to facilitate memory
41/*	leak testing.
42/* SEE ALSO
43/*	qmqp-source(1), QMQP message generator
44/* LICENSE
45/* .ad
46/* .fi
47/*	The Secure Mailer license must be distributed with this software.
48/* AUTHOR(S)
49/*	Wietse Venema
50/*	IBM T.J. Watson Research
51/*	P.O. Box 704
52/*	Yorktown Heights, NY 10598, USA
53/*--*/
54
55/* System library. */
56
57#include <sys_defs.h>
58#include <sys/socket.h>
59#include <sys/wait.h>
60#include <unistd.h>
61#include <string.h>
62#include <stdlib.h>
63#include <fcntl.h>
64#include <signal.h>
65
66/* Utility library. */
67
68#include <msg.h>
69#include <vstring.h>
70#include <vstream.h>
71#include <listen.h>
72#include <events.h>
73#include <mymalloc.h>
74#include <iostuff.h>
75#include <msg_vstream.h>
76#include <netstring.h>
77#include <inet_proto.h>
78
79/* Global library. */
80
81#include <qmqp_proto.h>
82#include <mail_version.h>
83
84/* Application-specific. */
85
86typedef struct {
87    VSTREAM *stream;			/* client connection */
88    int     count;			/* bytes to go */
89} SINK_STATE;
90
91static int var_tmout;
92static VSTRING *buffer;
93static void disconnect(SINK_STATE *);
94static int count_deliveries;
95static int counter;
96
97/* send_reply - finish conversation */
98
99static void send_reply(SINK_STATE *state)
100{
101    vstring_sprintf(buffer, "%cOk", QMQP_STAT_OK);
102    NETSTRING_PUT_BUF(state->stream, buffer);
103    netstring_fflush(state->stream);
104    if (count_deliveries) {
105	counter++;
106	vstream_printf("%d\r", counter);
107	vstream_fflush(VSTREAM_OUT);
108    }
109    disconnect(state);
110}
111
112/* read_data - read over-all netstring data */
113
114static void read_data(int unused_event, char *context)
115{
116    SINK_STATE *state = (SINK_STATE *) context;
117    int     fd = vstream_fileno(state->stream);
118    int     count;
119
120    /*
121     * Refill the VSTREAM buffer, if necessary.
122     */
123    if (VSTREAM_GETC(state->stream) == VSTREAM_EOF)
124	netstring_except(state->stream, vstream_ftimeout(state->stream) ?
125			 NETSTRING_ERR_TIME : NETSTRING_ERR_EOF);
126    state->count--;
127
128    /*
129     * Flush the VSTREAM buffer. As documented, vstream_fseek() discards
130     * unread input.
131     */
132    if ((count = vstream_peek(state->stream)) > 0) {
133	state->count -= count;
134	if (state->count <= 0) {
135	    send_reply(state);
136	    return;
137	}
138	vstream_fseek(state->stream, 0L, 0);
139    }
140
141    /*
142     * Do not block while waiting for the arrival of more data.
143     */
144    event_disable_readwrite(fd);
145    event_enable_read(fd, read_data, context);
146}
147
148/* read_length - read over-all netstring length */
149
150static void read_length(int event, char *context)
151{
152    SINK_STATE *state = (SINK_STATE *) context;
153
154    switch (vstream_setjmp(state->stream)) {
155
156    default:
157	msg_panic("unknown error reading input");
158
159    case NETSTRING_ERR_TIME:
160	msg_panic("attempt to read non-readable socket");
161	/* NOTREACHED */
162
163    case NETSTRING_ERR_EOF:
164	msg_warn("lost connection");
165	disconnect(state);
166	return;
167
168    case NETSTRING_ERR_FORMAT:
169	msg_warn("netstring format error");
170	disconnect(state);
171	return;
172
173    case NETSTRING_ERR_SIZE:
174	msg_warn("netstring size error");
175	disconnect(state);
176	return;
177
178	/*
179	 * Include the netstring terminator in the read byte count. This
180	 * violates abstractions.
181	 */
182    case 0:
183	state->count = netstring_get_length(state->stream) + 1;
184	read_data(event, context);
185	return;
186    }
187}
188
189/* disconnect - handle disconnection events */
190
191static void disconnect(SINK_STATE *state)
192{
193    event_disable_readwrite(vstream_fileno(state->stream));
194    vstream_fclose(state->stream);
195    myfree((char *) state);
196}
197
198/* connect_event - handle connection events */
199
200static void connect_event(int unused_event, char *context)
201{
202    int     sock = CAST_CHAR_PTR_TO_INT(context);
203    struct sockaddr sa;
204    SOCKADDR_SIZE len = sizeof(sa);
205    SINK_STATE *state;
206    int     fd;
207
208    if ((fd = accept(sock, &sa, &len)) >= 0) {
209	if (msg_verbose)
210	    msg_info("connect (%s)",
211#ifdef AF_LOCAL
212		     sa.sa_family == AF_LOCAL ? "AF_LOCAL" :
213#else
214		     sa.sa_family == AF_UNIX ? "AF_UNIX" :
215#endif
216		     sa.sa_family == AF_INET ? "AF_INET" :
217#ifdef AF_INET6
218		     sa.sa_family == AF_INET6 ? "AF_INET6" :
219#endif
220		     "unknown protocol family");
221	non_blocking(fd, NON_BLOCKING);
222	state = (SINK_STATE *) mymalloc(sizeof(*state));
223	state->stream = vstream_fdopen(fd, O_RDWR);
224	vstream_tweak_sock(state->stream);
225	netstring_setup(state->stream, var_tmout);
226	event_enable_read(fd, read_length, (char *) state);
227    }
228}
229
230/* terminate - voluntary exit */
231
232static void terminate(int unused_event, char *unused_context)
233{
234    exit(0);
235}
236
237/* usage - explain */
238
239static void usage(char *myname)
240{
241    msg_fatal("usage: %s [-cv] [-x time] [host]:port backlog", myname);
242}
243
244MAIL_VERSION_STAMP_DECLARE;
245
246int     main(int argc, char **argv)
247{
248    int     sock;
249    int     backlog;
250    int     ch;
251    int     ttl;
252    const char *protocols = INET_PROTO_NAME_ALL;
253    INET_PROTO_INFO *proto_info;
254
255    /*
256     * Fingerprint executables and core dumps.
257     */
258    MAIL_VERSION_STAMP_ALLOCATE;
259
260    /*
261     * Fix 20051207.
262     */
263    signal(SIGPIPE, SIG_IGN);
264
265    /*
266     * Initialize diagnostics.
267     */
268    msg_vstream_init(argv[0], VSTREAM_ERR);
269
270    /*
271     * Parse JCL.
272     */
273    while ((ch = GETOPT(argc, argv, "46cvx:")) > 0) {
274	switch (ch) {
275	case '4':
276	    protocols = INET_PROTO_NAME_IPV4;
277	    break;
278	case '6':
279	    protocols = INET_PROTO_NAME_IPV6;
280	    break;
281	case 'c':
282	    count_deliveries++;
283	    break;
284	case 'v':
285	    msg_verbose++;
286	    break;
287	case 'x':
288	    if ((ttl = atoi(optarg)) <= 0)
289		usage(argv[0]);
290	    event_request_timer(terminate, (char *) 0, ttl);
291	    break;
292	default:
293	    usage(argv[0]);
294	}
295    }
296    if (argc - optind != 2)
297	usage(argv[0]);
298    if ((backlog = atoi(argv[optind + 1])) <= 0)
299	usage(argv[0]);
300
301    /*
302     * Initialize.
303     */
304    proto_info = inet_proto_init("protocols", protocols);
305    buffer = vstring_alloc(1024);
306    if (strncmp(argv[optind], "unix:", 5) == 0) {
307	sock = unix_listen(argv[optind] + 5, backlog, BLOCKING);
308    } else {
309	if (strncmp(argv[optind], "inet:", 5) == 0)
310	    argv[optind] += 5;
311	sock = inet_listen(argv[optind], backlog, BLOCKING);
312    }
313
314    /*
315     * Start the event handler.
316     */
317    event_enable_read(sock, connect_event, CAST_INT_TO_CHAR_PTR(sock));
318    for (;;)
319	event_loop(-1);
320}
321