1/*++ 2/* NAME 3/* qmqp-sink 1 4/* SUMMARY 5/* multi-threaded QMQP test server 6/* SYNOPSIS 7/* .fi 8/* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR] 9/* [\fBinet:\fR][\fIhost\fR]:\fIport\fR \fIbacklog\fR 10/* 11/* \fBqmqp-sink\fR [\fB-46cv\fR] [\fB-x \fItime\fR] 12/* \fBunix:\fR\fIpathname\fR \fIbacklog\fR 13/* DESCRIPTION 14/* \fBqmqp-sink\fR listens on the named host (or address) and port. 15/* It receives messages from the network and throws them away. 16/* The purpose is to measure QMQP client performance, not protocol 17/* compliance. 18/* Connections can be accepted on IPv4 or IPv6 endpoints, or on 19/* UNIX-domain sockets. 20/* IPv4 and IPv6 are the default. 21/* This program is the complement of the \fBqmqp-source\fR(1) program. 22/* 23/* Note: this is an unsupported test program. No attempt is made 24/* to maintain compatibility between successive versions. 25/* 26/* Arguments: 27/* .IP \fB-4\fR 28/* Support IPv4 only. This option has no effect when 29/* Postfix is built without IPv6 support. 30/* .IP \fB-6\fR 31/* Support IPv6 only. This option is not available when 32/* Postfix is built without IPv6 support. 33/* .IP \fB-c\fR 34/* Display a running counter that is updated whenever a delivery 35/* is completed. 36/* .IP \fB-v\fR 37/* Increase verbosity. Specify \fB-v -v\fR to see some of the QMQP 38/* conversation. 39/* .IP "\fB-x \fItime\fR 40/* Terminate after \fItime\fR seconds. This is to facilitate memory 41/* leak testing. 42/* SEE ALSO 43/* qmqp-source(1), QMQP message generator 44/* LICENSE 45/* .ad 46/* .fi 47/* The Secure Mailer license must be distributed with this software. 48/* AUTHOR(S) 49/* Wietse Venema 50/* IBM T.J. Watson Research 51/* P.O. Box 704 52/* Yorktown Heights, NY 10598, USA 53/*--*/ 54 55/* System library. */ 56 57#include <sys_defs.h> 58#include <sys/socket.h> 59#include <sys/wait.h> 60#include <unistd.h> 61#include <string.h> 62#include <stdlib.h> 63#include <fcntl.h> 64#include <signal.h> 65 66/* Utility library. */ 67 68#include <msg.h> 69#include <vstring.h> 70#include <vstream.h> 71#include <listen.h> 72#include <events.h> 73#include <mymalloc.h> 74#include <iostuff.h> 75#include <msg_vstream.h> 76#include <netstring.h> 77#include <inet_proto.h> 78 79/* Global library. */ 80 81#include <qmqp_proto.h> 82#include <mail_version.h> 83 84/* Application-specific. */ 85 86typedef struct { 87 VSTREAM *stream; /* client connection */ 88 int count; /* bytes to go */ 89} SINK_STATE; 90 91static int var_tmout; 92static VSTRING *buffer; 93static void disconnect(SINK_STATE *); 94static int count_deliveries; 95static int counter; 96 97/* send_reply - finish conversation */ 98 99static void send_reply(SINK_STATE *state) 100{ 101 vstring_sprintf(buffer, "%cOk", QMQP_STAT_OK); 102 NETSTRING_PUT_BUF(state->stream, buffer); 103 netstring_fflush(state->stream); 104 if (count_deliveries) { 105 counter++; 106 vstream_printf("%d\r", counter); 107 vstream_fflush(VSTREAM_OUT); 108 } 109 disconnect(state); 110} 111 112/* read_data - read over-all netstring data */ 113 114static void read_data(int unused_event, char *context) 115{ 116 SINK_STATE *state = (SINK_STATE *) context; 117 int fd = vstream_fileno(state->stream); 118 int count; 119 120 /* 121 * Refill the VSTREAM buffer, if necessary. 122 */ 123 if (VSTREAM_GETC(state->stream) == VSTREAM_EOF) 124 netstring_except(state->stream, vstream_ftimeout(state->stream) ? 125 NETSTRING_ERR_TIME : NETSTRING_ERR_EOF); 126 state->count--; 127 128 /* 129 * Flush the VSTREAM buffer. As documented, vstream_fseek() discards 130 * unread input. 131 */ 132 if ((count = vstream_peek(state->stream)) > 0) { 133 state->count -= count; 134 if (state->count <= 0) { 135 send_reply(state); 136 return; 137 } 138 vstream_fseek(state->stream, 0L, 0); 139 } 140 141 /* 142 * Do not block while waiting for the arrival of more data. 143 */ 144 event_disable_readwrite(fd); 145 event_enable_read(fd, read_data, context); 146} 147 148/* read_length - read over-all netstring length */ 149 150static void read_length(int event, char *context) 151{ 152 SINK_STATE *state = (SINK_STATE *) context; 153 154 switch (vstream_setjmp(state->stream)) { 155 156 default: 157 msg_panic("unknown error reading input"); 158 159 case NETSTRING_ERR_TIME: 160 msg_panic("attempt to read non-readable socket"); 161 /* NOTREACHED */ 162 163 case NETSTRING_ERR_EOF: 164 msg_warn("lost connection"); 165 disconnect(state); 166 return; 167 168 case NETSTRING_ERR_FORMAT: 169 msg_warn("netstring format error"); 170 disconnect(state); 171 return; 172 173 case NETSTRING_ERR_SIZE: 174 msg_warn("netstring size error"); 175 disconnect(state); 176 return; 177 178 /* 179 * Include the netstring terminator in the read byte count. This 180 * violates abstractions. 181 */ 182 case 0: 183 state->count = netstring_get_length(state->stream) + 1; 184 read_data(event, context); 185 return; 186 } 187} 188 189/* disconnect - handle disconnection events */ 190 191static void disconnect(SINK_STATE *state) 192{ 193 event_disable_readwrite(vstream_fileno(state->stream)); 194 vstream_fclose(state->stream); 195 myfree((char *) state); 196} 197 198/* connect_event - handle connection events */ 199 200static void connect_event(int unused_event, char *context) 201{ 202 int sock = CAST_CHAR_PTR_TO_INT(context); 203 struct sockaddr sa; 204 SOCKADDR_SIZE len = sizeof(sa); 205 SINK_STATE *state; 206 int fd; 207 208 if ((fd = accept(sock, &sa, &len)) >= 0) { 209 if (msg_verbose) 210 msg_info("connect (%s)", 211#ifdef AF_LOCAL 212 sa.sa_family == AF_LOCAL ? "AF_LOCAL" : 213#else 214 sa.sa_family == AF_UNIX ? "AF_UNIX" : 215#endif 216 sa.sa_family == AF_INET ? "AF_INET" : 217#ifdef AF_INET6 218 sa.sa_family == AF_INET6 ? "AF_INET6" : 219#endif 220 "unknown protocol family"); 221 non_blocking(fd, NON_BLOCKING); 222 state = (SINK_STATE *) mymalloc(sizeof(*state)); 223 state->stream = vstream_fdopen(fd, O_RDWR); 224 vstream_tweak_sock(state->stream); 225 netstring_setup(state->stream, var_tmout); 226 event_enable_read(fd, read_length, (char *) state); 227 } 228} 229 230/* terminate - voluntary exit */ 231 232static void terminate(int unused_event, char *unused_context) 233{ 234 exit(0); 235} 236 237/* usage - explain */ 238 239static void usage(char *myname) 240{ 241 msg_fatal("usage: %s [-cv] [-x time] [host]:port backlog", myname); 242} 243 244MAIL_VERSION_STAMP_DECLARE; 245 246int main(int argc, char **argv) 247{ 248 int sock; 249 int backlog; 250 int ch; 251 int ttl; 252 const char *protocols = INET_PROTO_NAME_ALL; 253 INET_PROTO_INFO *proto_info; 254 255 /* 256 * Fingerprint executables and core dumps. 257 */ 258 MAIL_VERSION_STAMP_ALLOCATE; 259 260 /* 261 * Fix 20051207. 262 */ 263 signal(SIGPIPE, SIG_IGN); 264 265 /* 266 * Initialize diagnostics. 267 */ 268 msg_vstream_init(argv[0], VSTREAM_ERR); 269 270 /* 271 * Parse JCL. 272 */ 273 while ((ch = GETOPT(argc, argv, "46cvx:")) > 0) { 274 switch (ch) { 275 case '4': 276 protocols = INET_PROTO_NAME_IPV4; 277 break; 278 case '6': 279 protocols = INET_PROTO_NAME_IPV6; 280 break; 281 case 'c': 282 count_deliveries++; 283 break; 284 case 'v': 285 msg_verbose++; 286 break; 287 case 'x': 288 if ((ttl = atoi(optarg)) <= 0) 289 usage(argv[0]); 290 event_request_timer(terminate, (char *) 0, ttl); 291 break; 292 default: 293 usage(argv[0]); 294 } 295 } 296 if (argc - optind != 2) 297 usage(argv[0]); 298 if ((backlog = atoi(argv[optind + 1])) <= 0) 299 usage(argv[0]); 300 301 /* 302 * Initialize. 303 */ 304 proto_info = inet_proto_init("protocols", protocols); 305 buffer = vstring_alloc(1024); 306 if (strncmp(argv[optind], "unix:", 5) == 0) { 307 sock = unix_listen(argv[optind] + 5, backlog, BLOCKING); 308 } else { 309 if (strncmp(argv[optind], "inet:", 5) == 0) 310 argv[optind] += 5; 311 sock = inet_listen(argv[optind], backlog, BLOCKING); 312 } 313 314 /* 315 * Start the event handler. 316 */ 317 event_enable_read(sock, connect_event, CAST_INT_TO_CHAR_PTR(sock)); 318 for (;;) 319 event_loop(-1); 320} 321