1/*++ 2/* NAME 3/* qmqp-source 1 4/* SUMMARY 5/* multi-threaded QMQP test generator 6/* SYNOPSIS 7/* .fi 8/* \fBqmqp-source\fR [\fIoptions\fR] [\fBinet:\fR]\fIhost\fR[:\fIport\fR] 9/* 10/* \fBqmqp-source\fR [\fIoptions\fR] \fBunix:\fIpathname\fR 11/* DESCRIPTION 12/* \fBqmqp-source\fR connects to the named host and TCP port (default 628) 13/* and sends one or more messages to it, either sequentially 14/* or in parallel. The program speaks the QMQP protocol. 15/* Connections can be made to UNIX-domain and IPv4 or IPv6 servers. 16/* IPv4 and IPv6 are the default. 17/* 18/* Note: this is an unsupported test program. No attempt is made 19/* to maintain compatibility between successive versions. 20/* 21/* Arguments: 22/* .IP \fB-4\fR 23/* Connect to the server with IPv4. This option has no effect when 24/* Postfix is built without IPv6 support. 25/* .IP \fB-6\fR 26/* Connect to the server with IPv6. This option is not available when 27/* Postfix is built without IPv6 support. 28/* .IP \fB-c\fR 29/* Display a running counter that is incremented each time 30/* a delivery completes. 31/* .IP "\fB-C \fIcount\fR" 32/* When a host sends RESET instead of SYN|ACK, try \fIcount\fR times 33/* before giving up. The default count is 1. Specify a larger count in 34/* order to work around a problem with TCP/IP stacks that send RESET 35/* when the listen queue is full. 36/* .IP "\fB-f \fIfrom\fR" 37/* Use the specified sender address (default: <foo@myhostname>). 38/* .IP "\fB-l \fIlength\fR" 39/* Send \fIlength\fR bytes as message payload. The length 40/* includes the message headers. 41/* .IP "\fB-m \fImessage_count\fR" 42/* Send the specified number of messages (default: 1). 43/* .IP "\fB-M \fImyhostname\fR" 44/* Use the specified hostname or [address] in the default 45/* sender and recipient addresses, instead of the machine 46/* hostname. 47/* .IP "\fB-r \fIrecipient_count\fR" 48/* Send the specified number of recipients per transaction (default: 1). 49/* Recipient names are generated by prepending a number to the 50/* recipient address. 51/* .IP "\fB-s \fIsession_count\fR" 52/* Run the specified number of QMQP sessions in parallel (default: 1). 53/* .IP "\fB-t \fIto\fR" 54/* Use the specified recipient address (default: <foo@myhostname>). 55/* .IP "\fB-R \fIinterval\fR" 56/* Wait for a random period of time 0 <= n <= interval between messages. 57/* Suspending one thread does not affect other delivery threads. 58/* .IP \fB-v\fR 59/* Make the program more verbose, for debugging purposes. 60/* .IP "\fB-w \fIinterval\fR" 61/* Wait a fixed time between messages. 62/* Suspending one thread does not affect other delivery threads. 63/* SEE ALSO 64/* qmqp-sink(1), QMQP message dump 65/* LICENSE 66/* .ad 67/* .fi 68/* The Secure Mailer license must be distributed with this software. 69/* AUTHOR(S) 70/* Wietse Venema 71/* IBM T.J. Watson Research 72/* P.O. Box 704 73/* Yorktown Heights, NY 10598, USA 74/*--*/ 75 76/* System library. */ 77 78#include <sys_defs.h> 79#include <sys/socket.h> 80#include <sys/wait.h> 81#include <netinet/in.h> 82#include <sys/un.h> 83#include <stdlib.h> 84#include <unistd.h> 85#include <signal.h> 86#include <errno.h> 87#include <string.h> 88 89/* Utility library. */ 90 91#include <msg.h> 92#include <msg_vstream.h> 93#include <vstring.h> 94#include <vstream.h> 95#include <get_hostname.h> 96#include <split_at.h> 97#include <connect.h> 98#include <mymalloc.h> 99#include <events.h> 100#include <iostuff.h> 101#include <netstring.h> 102#include <sane_connect.h> 103#include <host_port.h> 104#include <myaddrinfo.h> 105#include <inet_proto.h> 106#include <valid_hostname.h> 107#include <valid_mailhost_addr.h> 108 109/* Global library. */ 110 111#include <mail_date.h> 112#include <qmqp_proto.h> 113#include <mail_version.h> 114 115/* Application-specific. */ 116 117 /* 118 * Per-session data structure with state. 119 * 120 * This software can maintain multiple parallel connections to the same QMQP 121 * server. However, it makes no more than one connection request at a time 122 * to avoid overwhelming the server with SYN packets and having to back off. 123 * Back-off would screw up the benchmark. Pending connection requests are 124 * kept in a linear list. 125 */ 126typedef struct SESSION { 127 int xfer_count; /* # of xfers in session */ 128 int rcpt_done; /* # of recipients done */ 129 int rcpt_count; /* # of recipients to go */ 130 VSTREAM *stream; /* open connection */ 131 int connect_count; /* # of connect()s to retry */ 132 struct SESSION *next; /* connect() queue linkage */ 133} SESSION; 134 135static SESSION *last_session; /* connect() queue tail */ 136 137static VSTRING *buffer; 138static int var_line_limit = 10240; 139static int var_timeout = 300; 140static const char *var_myhostname; 141static int session_count; 142static int message_count = 1; 143static struct sockaddr_storage ss; 144 145#undef sun 146static struct sockaddr_un sun; 147static struct sockaddr *sa; 148static int sa_length; 149static int recipients = 1; 150static char *defaddr; 151static char *recipient; 152static char *sender; 153static int message_length = 1024; 154static int count = 0; 155static int counter = 0; 156static int connect_count = 1; 157static int random_delay = 0; 158static int fixed_delay = 0; 159static const char *mydate; 160static int mypid; 161 162static void enqueue_connect(SESSION *); 163static void start_connect(SESSION *); 164static void connect_done(int, char *); 165 166static void send_data(SESSION *); 167static void receive_reply(int, char *); 168 169static VSTRING *message_buffer; 170static VSTRING *sender_buffer; 171static VSTRING *recipient_buffer; 172 173/* Silly little macros. */ 174 175#define STR(x) vstring_str(x) 176#define LEN(x) VSTRING_LEN(x) 177 178/* random_interval - generate a random value in 0 .. (small) interval */ 179 180static int random_interval(int interval) 181{ 182 return (rand() % (interval + 1)); 183} 184 185/* socket_error - look up and reset the last socket error */ 186 187static int socket_error(int sock) 188{ 189 int error; 190 SOCKOPT_SIZE error_len; 191 192 /* 193 * Some Solaris 2 versions have getsockopt() itself return the error, 194 * instead of returning it via the parameter list. 195 */ 196 error = 0; 197 error_len = sizeof(error); 198 if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) < 0) 199 return (-1); 200 if (error) { 201 errno = error; 202 return (-1); 203 } 204 205 /* 206 * No problems. 207 */ 208 return (0); 209} 210 211/* exception_text - translate exceptions from the netstring module */ 212 213static char *exception_text(int except) 214{ 215 ; 216 217 switch (except) { 218 case NETSTRING_ERR_EOF: 219 return ("lost connection"); 220 case NETSTRING_ERR_TIME: 221 return ("timeout"); 222 case NETSTRING_ERR_FORMAT: 223 return ("netstring format error"); 224 case NETSTRING_ERR_SIZE: 225 return ("netstring size exceeds limit"); 226 default: 227 msg_panic("exception_text: unknown exception %d", except); 228 } 229 /* NOTREACHED */ 230} 231 232/* startup - connect to server but do not wait */ 233 234static void startup(SESSION *session) 235{ 236 if (message_count-- <= 0) { 237 myfree((char *) session); 238 session_count--; 239 return; 240 } 241 enqueue_connect(session); 242} 243 244/* start_event - invoke startup from timer context */ 245 246static void start_event(int unused_event, char *context) 247{ 248 SESSION *session = (SESSION *) context; 249 250 startup(session); 251} 252 253/* start_another - start another session */ 254 255static void start_another(SESSION *session) 256{ 257 if (random_delay > 0) { 258 event_request_timer(start_event, (char *) session, 259 random_interval(random_delay)); 260 } else if (fixed_delay > 0) { 261 event_request_timer(start_event, (char *) session, fixed_delay); 262 } else { 263 startup(session); 264 } 265} 266 267/* enqueue_connect - queue a connection request */ 268 269static void enqueue_connect(SESSION *session) 270{ 271 session->next = 0; 272 if (last_session == 0) { 273 last_session = session; 274 start_connect(session); 275 } else { 276 last_session->next = session; 277 last_session = session; 278 } 279} 280 281/* dequeue_connect - connection request completed */ 282 283static void dequeue_connect(SESSION *session) 284{ 285 if (session == last_session) { 286 if (session->next != 0) 287 msg_panic("dequeue_connect: queue ends after last"); 288 last_session = 0; 289 } else { 290 if (session->next == 0) 291 msg_panic("dequeue_connect: queue ends before last"); 292 start_connect(session->next); 293 } 294} 295 296/* fail_connect - handle failed startup */ 297 298static void fail_connect(SESSION *session) 299{ 300 if (session->connect_count-- == 1) 301 msg_fatal("connect: %m"); 302 msg_warn("connect: %m"); 303 event_disable_readwrite(vstream_fileno(session->stream)); 304 vstream_fclose(session->stream); 305 session->stream = 0; 306#ifdef MISSING_USLEEP 307 doze(10); 308#else 309 usleep(10); 310#endif 311 start_connect(session); 312} 313 314/* start_connect - start TCP handshake */ 315 316static void start_connect(SESSION *session) 317{ 318 int fd; 319 struct linger linger; 320 321 /* 322 * Some systems don't set the socket error when connect() fails early 323 * (loopback) so we must deal with the error immediately, rather than 324 * retrieving it later with getsockopt(). We can't use MSG_PEEK to 325 * distinguish between server disconnect and connection refused. 326 */ 327 if ((fd = socket(sa->sa_family, SOCK_STREAM, 0)) < 0) 328 msg_fatal("socket: %m"); 329 (void) non_blocking(fd, NON_BLOCKING); 330 linger.l_onoff = 1; 331 linger.l_linger = 0; 332 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *) &linger, 333 sizeof(linger)) < 0) 334 msg_warn("setsockopt SO_LINGER %d: %m", linger.l_linger); 335 session->stream = vstream_fdopen(fd, O_RDWR); 336 event_enable_write(fd, connect_done, (char *) session); 337 netstring_setup(session->stream, var_timeout); 338 if (sane_connect(fd, sa, sa_length) < 0 && errno != EINPROGRESS) 339 fail_connect(session); 340} 341 342/* connect_done - send message sender info */ 343 344static void connect_done(int unused_event, char *context) 345{ 346 SESSION *session = (SESSION *) context; 347 int fd = vstream_fileno(session->stream); 348 349 /* 350 * Try again after some delay when the connection failed, in case they 351 * run a Mickey Mouse protocol stack. 352 */ 353 if (socket_error(fd) < 0) { 354 fail_connect(session); 355 } else { 356 dequeue_connect(session); 357 non_blocking(fd, BLOCKING); 358 event_disable_readwrite(fd); 359 /* Avoid poor performance when TCP MSS > VSTREAM_BUFSIZE. */ 360 if (sa->sa_family == AF_INET 361#ifdef AF_INET6 362 || sa->sa_family == AF_INET6 363#endif 364 ) 365 vstream_tweak_tcp(session->stream); 366 send_data(session); 367 } 368} 369 370/* send_data - send message+sender+recipients */ 371 372static void send_data(SESSION *session) 373{ 374 int fd = vstream_fileno(session->stream); 375 int except; 376 377 /* 378 * Prepare for disaster. 379 */ 380 if ((except = vstream_setjmp(session->stream)) != 0) 381 msg_fatal("%s while sending message", exception_text(except)); 382 383 /* 384 * Send the message content, by wrapping three netstrings into an 385 * over-all netstring. 386 * 387 * XXX This should be done more carefully to avoid blocking when sending 388 * large messages over slow networks. 389 */ 390 netstring_put_multi(session->stream, 391 STR(message_buffer), LEN(message_buffer), 392 STR(sender_buffer), LEN(sender_buffer), 393 STR(recipient_buffer), LEN(recipient_buffer), 394 (char *) 0); 395 netstring_fflush(session->stream); 396 397 /* 398 * Wake me up when the server replies or when something bad happens. 399 */ 400 event_enable_read(fd, receive_reply, (char *) session); 401} 402 403/* receive_reply - read server reply */ 404 405static void receive_reply(int unused_event, char *context) 406{ 407 SESSION *session = (SESSION *) context; 408 int except; 409 410 /* 411 * Prepare for disaster. 412 */ 413 if ((except = vstream_setjmp(session->stream)) != 0) 414 msg_fatal("%s while receiving server reply", exception_text(except)); 415 416 /* 417 * Receive and process the server reply. 418 */ 419 netstring_get(session->stream, buffer, var_line_limit); 420 if (msg_verbose) 421 vstream_printf("<< %.*s\n", (int) LEN(buffer), STR(buffer)); 422 if (STR(buffer)[0] != QMQP_STAT_OK) 423 msg_fatal("%s error: %.*s", 424 STR(buffer)[0] == QMQP_STAT_RETRY ? "recoverable" : 425 STR(buffer)[0] == QMQP_STAT_HARD ? "unrecoverable" : 426 "unknown", (int) LEN(buffer) - 1, STR(buffer) + 1); 427 428 /* 429 * Update the optional running counter. 430 */ 431 if (count) { 432 counter++; 433 vstream_printf("%d\r", counter); 434 vstream_fflush(VSTREAM_OUT); 435 } 436 437 /* 438 * Finish this session. QMQP sends only one message per session. 439 */ 440 event_disable_readwrite(vstream_fileno(session->stream)); 441 vstream_fclose(session->stream); 442 session->stream = 0; 443 start_another(session); 444} 445 446/* usage - explain */ 447 448static void usage(char *myname) 449{ 450 msg_fatal("usage: %s -cv -s sess -l msglen -m msgs -C count -M myhostname -f from -t to -R delay -w delay host[:port]", myname); 451} 452 453MAIL_VERSION_STAMP_DECLARE; 454 455/* main - parse JCL and start the machine */ 456 457int main(int argc, char **argv) 458{ 459 SESSION *session; 460 char *host; 461 char *port; 462 char *path; 463 int path_len; 464 int sessions = 1; 465 int ch; 466 ssize_t len; 467 int n; 468 int i; 469 char *buf; 470 const char *parse_err; 471 struct addrinfo *res; 472 int aierr; 473 const char *protocols = INET_PROTO_NAME_ALL; 474 INET_PROTO_INFO *proto_info; 475 476 /* 477 * Fingerprint executables and core dumps. 478 */ 479 MAIL_VERSION_STAMP_ALLOCATE; 480 481 signal(SIGPIPE, SIG_IGN); 482 msg_vstream_init(argv[0], VSTREAM_ERR); 483 484 /* 485 * Parse JCL. 486 */ 487 while ((ch = GETOPT(argc, argv, "46cC:f:l:m:M:r:R:s:t:vw:")) > 0) { 488 switch (ch) { 489 case '4': 490 protocols = INET_PROTO_NAME_IPV4; 491 break; 492 case '6': 493 protocols = INET_PROTO_NAME_IPV6; 494 break; 495 case 'c': 496 count++; 497 break; 498 case 'C': 499 if ((connect_count = atoi(optarg)) <= 0) 500 usage(argv[0]); 501 break; 502 case 'f': 503 sender = optarg; 504 break; 505 case 'l': 506 if ((message_length = atoi(optarg)) <= 0) 507 usage(argv[0]); 508 break; 509 case 'm': 510 if ((message_count = atoi(optarg)) <= 0) 511 usage(argv[0]); 512 break; 513 case 'M': 514 if (*optarg == '[') { 515 if (!valid_mailhost_literal(optarg, DO_GRIPE)) 516 msg_fatal("bad address literal: %s", optarg); 517 } else { 518 if (!valid_hostname(optarg, DO_GRIPE)) 519 msg_fatal("bad hostname: %s", optarg); 520 } 521 var_myhostname = optarg; 522 break; 523 case 'r': 524 if ((recipients = atoi(optarg)) <= 0) 525 usage(argv[0]); 526 break; 527 case 'R': 528 if (fixed_delay > 0 || (random_delay = atoi(optarg)) <= 0) 529 usage(argv[0]); 530 break; 531 case 's': 532 if ((sessions = atoi(optarg)) <= 0) 533 usage(argv[0]); 534 break; 535 case 't': 536 recipient = optarg; 537 break; 538 case 'v': 539 msg_verbose++; 540 break; 541 case 'w': 542 if (random_delay > 0 || (fixed_delay = atoi(optarg)) <= 0) 543 usage(argv[0]); 544 break; 545 default: 546 usage(argv[0]); 547 } 548 } 549 if (argc - optind != 1) 550 usage(argv[0]); 551 552 if (random_delay > 0) 553 srand(getpid()); 554 555 /* 556 * Translate endpoint address to internal form. 557 */ 558 proto_info = inet_proto_init("protocols", protocols); 559 if (strncmp(argv[optind], "unix:", 5) == 0) { 560 path = argv[optind] + 5; 561 path_len = strlen(path); 562 if (path_len >= (int) sizeof(sun.sun_path)) 563 msg_fatal("unix-domain name too long: %s", path); 564 memset((char *) &sun, 0, sizeof(sun)); 565 sun.sun_family = AF_UNIX; 566#ifdef HAS_SUN_LEN 567 sun.sun_len = path_len + 1; 568#endif 569 memcpy(sun.sun_path, path, path_len); 570 sa = (struct sockaddr *) & sun; 571 sa_length = sizeof(sun); 572 } else { 573 if (strncmp(argv[optind], "inet:", 5) == 0) 574 argv[optind] += 5; 575 buf = mystrdup(argv[optind]); 576 if ((parse_err = host_port(buf, &host, (char *) 0, &port, "628")) != 0) 577 msg_fatal("%s: %s", argv[optind], parse_err); 578 if ((aierr = hostname_to_sockaddr(host, port, SOCK_STREAM, &res)) != 0) 579 msg_fatal("%s: %s", argv[optind], MAI_STRERROR(aierr)); 580 myfree(buf); 581 sa = (struct sockaddr *) & ss; 582 if (res->ai_addrlen > sizeof(ss)) 583 msg_fatal("address length %d > buffer length %d", 584 (int) res->ai_addrlen, (int) sizeof(ss)); 585 memcpy((char *) sa, res->ai_addr, res->ai_addrlen); 586 sa_length = res->ai_addrlen; 587#ifdef HAS_SA_LEN 588 sa->sa_len = sa_length; 589#endif 590 freeaddrinfo(res); 591 } 592 593 /* 594 * Allocate space for temporary buffer. 595 */ 596 buffer = vstring_alloc(100); 597 598 /* 599 * Make sure we have sender and recipient addresses. 600 */ 601 if (var_myhostname == 0) 602 var_myhostname = get_hostname(); 603 if (sender == 0 || recipient == 0) { 604 vstring_sprintf(buffer, "foo@%s", var_myhostname); 605 defaddr = mystrdup(vstring_str(buffer)); 606 if (sender == 0) 607 sender = defaddr; 608 if (recipient == 0) 609 recipient = defaddr; 610 } 611 612 /* 613 * Prepare some results that may be used multiple times: the message 614 * content netstring, the sender netstring, and the recipient netstrings. 615 */ 616 mydate = mail_date(time((time_t *) 0)); 617 mypid = getpid(); 618 619 message_buffer = vstring_alloc(message_length + 200); 620 vstring_sprintf(buffer, 621 "From: <%s>\nTo: <%s>\nDate: %s\nMessage-Id: <%d@%s>\n\n", 622 sender, recipient, mydate, mypid, var_myhostname); 623 for (n = 1; LEN(buffer) < message_length; n++) { 624 for (i = 0; i < n && i < 79; i++) 625 VSTRING_ADDCH(buffer, 'X'); 626 VSTRING_ADDCH(buffer, '\n'); 627 } 628 STR(buffer)[message_length - 1] = '\n'; 629 netstring_memcpy(message_buffer, STR(buffer), message_length); 630 631 len = strlen(sender); 632 sender_buffer = vstring_alloc(len); 633 netstring_memcpy(sender_buffer, sender, len); 634 635 if (recipients == 1) { 636 len = strlen(recipient); 637 recipient_buffer = vstring_alloc(len); 638 netstring_memcpy(recipient_buffer, recipient, len); 639 } else { 640 recipient_buffer = vstring_alloc(100); 641 for (n = 0; n < recipients; n++) { 642 vstring_sprintf(buffer, "%d%s", n, recipient); 643 netstring_memcat(recipient_buffer, STR(buffer), LEN(buffer)); 644 } 645 } 646 647 /* 648 * Start sessions. 649 */ 650 while (sessions-- > 0) { 651 session = (SESSION *) mymalloc(sizeof(*session)); 652 session->stream = 0; 653 session->xfer_count = 0; 654 session->connect_count = connect_count; 655 session->next = 0; 656 session_count++; 657 startup(session); 658 } 659 for (;;) { 660 event_loop(-1); 661 if (session_count <= 0 && message_count <= 0) { 662 if (count) { 663 VSTREAM_PUTC('\n', VSTREAM_OUT); 664 vstream_fflush(VSTREAM_OUT); 665 } 666 exit(0); 667 } 668 } 669} 670