1#include <AvailabilityMacros.h> 2#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 3#include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h> 4#endif 5 6#include <pthread.h> 7#include <stdio.h> 8#include <stdlib.h> 9#include <string.h> 10#include <err.h> 11#include <unistd.h> 12 13#include <pthread.h> 14#include <mach/mach.h> 15#include <mach/mach_error.h> 16#include <mach/notify.h> 17#include <servers/bootstrap.h> 18#include <sys/event.h> 19#include <sys/select.h> 20#include <sys/types.h> 21#include <sys/time.h> 22#include <sys/signal.h> 23 24#define MAX(A, B) ((A) < (B) ? (B) : (A)) 25 26 27typedef struct { 28 mach_msg_header_t header; 29 mach_msg_trailer_t trailer; // subtract this when sending 30} ipc_trivial_message; 31 32typedef struct { 33 mach_msg_header_t header; 34 u_int32_t numbers[0]; 35 mach_msg_trailer_t trailer; // subtract this when sending 36} ipc_inline_message; 37 38typedef struct { 39 mach_msg_header_t header; 40 mach_msg_body_t body; 41 mach_msg_ool_descriptor_t descriptor; 42 mach_msg_trailer_t trailer; // subtract this when sending 43} ipc_complex_message; 44 45enum { 46 msg_type_trivial = 0, 47 msg_type_inline = 1, 48 msg_type_complex = 2 49}; 50 51struct port_args { 52 int server_num; 53 int req_size; 54 mach_msg_header_t *req_msg; 55 int reply_size; 56 mach_msg_header_t *reply_msg; 57 mach_port_t port; 58 mach_port_t pset; 59}; 60 61typedef union { 62 pid_t pid; 63 pthread_t tid; 64} thread_id_t; 65 66/* Global options */ 67static boolean_t verbose = FALSE; 68static boolean_t affinity = FALSE; 69static boolean_t timeshare = FALSE; 70static boolean_t threaded = FALSE; 71static boolean_t oneway = FALSE; 72static boolean_t do_select = FALSE; 73int msg_type; 74int num_ints; 75int num_msgs; 76int num_clients; 77int num_servers; 78int client_delay; 79int client_spin; 80int client_pages; 81char **server_port_name; 82 83void signal_handler(int sig) { 84} 85 86void usage(const char *progname) { 87 fprintf(stderr, "usage: %s [options]\n", progname); 88 fprintf(stderr, "where options are:\n"); 89 fprintf(stderr, " -affinity\t\tthreads use affinity\n"); 90 fprintf(stderr, " -timeshare\t\tthreads use timeshare\n"); 91 fprintf(stderr, " -threaded\t\tuse (p)threads\n"); 92 fprintf(stderr, " -verbose\t\tbe verbose\n"); 93 fprintf(stderr, " -oneway\t\tdo not request return reply\n"); 94 fprintf(stderr, " -count num\t\tnumber of messages to send\n"); 95 fprintf(stderr, " -type trivial|inline|complex\ttype of messages to send\n"); 96 fprintf(stderr, " -numints num\tnumber of 32-bit ints to send in messages\n"); 97 fprintf(stderr, " -servers num\tnumber of servers threads to run\n"); 98 fprintf(stderr, " -clients num\tnumber of clients per server\n"); 99 fprintf(stderr, " -delay num\t\tmicroseconds to sleep clients between messages\n"); 100 fprintf(stderr, " -work num\t\tmicroseconds of client work\n"); 101 fprintf(stderr, " -pages num\t\tpages of memory touched by client work\n"); 102 fprintf(stderr, " -select \t\tselect prior to calling kevent().\n"); 103 fprintf(stderr, "default values are:\n"); 104 fprintf(stderr, " . no affinity\n"); 105 fprintf(stderr, " . not timeshare\n"); 106 fprintf(stderr, " . not verbose\n"); 107 fprintf(stderr, " . not oneway\n"); 108 fprintf(stderr, " . client sends 100000 messages\n"); 109 fprintf(stderr, " . inline message type\n"); 110 fprintf(stderr, " . 64 32-bit integers in inline/complex messages\n"); 111 fprintf(stderr, " . (num_available_processors+1)%%2 servers\n"); 112 fprintf(stderr, " . 4 clients per server\n"); 113 fprintf(stderr, " . no delay\n"); 114 exit(1); 115} 116 117void parse_args(int argc, char *argv[]) { 118 host_basic_info_data_t info; 119 mach_msg_type_number_t count; 120 kern_return_t result; 121 122 /* Initialize defaults */ 123 msg_type = msg_type_trivial; 124 num_ints = 64; 125 num_msgs = 100000; 126 client_delay = 0; 127 num_clients = 4; 128 129 count = HOST_BASIC_INFO_COUNT; 130 result = host_info(mach_host_self(), HOST_BASIC_INFO, 131 (host_info_t)&info, &count); 132 if (result == KERN_SUCCESS && info.avail_cpus > 1) 133 num_servers = info.avail_cpus / 2; 134 else 135 num_servers = 1; 136 137 const char *progname = argv[0]; 138 argc--; argv++; 139 while (0 < argc) { 140 if (0 == strcmp("-verbose", argv[0])) { 141 verbose = TRUE; 142 argc--; argv++; 143 } else if (0 == strcmp("-affinity", argv[0])) { 144 affinity = TRUE; 145 argc--; argv++; 146 } else if (0 == strcmp("-timeshare", argv[0])) { 147 timeshare = TRUE; 148 argc--; argv++; 149 } else if (0 == strcmp("-threaded", argv[0])) { 150 threaded = TRUE; 151 argc--; argv++; 152 } else if (0 == strcmp("-oneway", argv[0])) { 153 oneway = TRUE; 154 argc--; argv++; 155 } else if (0 == strcmp("-type", argv[0])) { 156 if (argc < 2) 157 usage(progname); 158 if (0 == strcmp("trivial", argv[1])) { 159 msg_type = msg_type_trivial; 160 } else if (0 == strcmp("inline", argv[1])) { 161 msg_type = msg_type_inline; 162 } else if (0 == strcmp("complex", argv[1])) { 163 msg_type = msg_type_complex; 164 } else 165 usage(progname); 166 argc -= 2; argv += 2; 167 } else if (0 == strcmp("-numints", argv[0])) { 168 if (argc < 2) 169 usage(progname); 170 num_ints = strtoul(argv[1], NULL, 0); 171 argc -= 2; argv += 2; 172 } else if (0 == strcmp("-count", argv[0])) { 173 if (argc < 2) 174 usage(progname); 175 num_msgs = strtoul(argv[1], NULL, 0); 176 argc -= 2; argv += 2; 177 } else if (0 == strcmp("-clients", argv[0])) { 178 if (argc < 2) 179 usage(progname); 180 num_clients = strtoul(argv[1], NULL, 0); 181 argc -= 2; argv += 2; 182 } else if (0 == strcmp("-servers", argv[0])) { 183 if (argc < 2) 184 usage(progname); 185 num_servers = strtoul(argv[1], NULL, 0); 186 argc -= 2; argv += 2; 187 } else if (0 == strcmp("-delay", argv[0])) { 188 if (argc < 2) 189 usage(progname); 190 client_delay = strtoul(argv[1], NULL, 0); 191 argc -= 2; argv += 2; 192 } else if (0 == strcmp("-spin", argv[0])) { 193 if (argc < 2) 194 usage(progname); 195 client_spin = strtoul(argv[1], NULL, 0); 196 argc -= 2; argv += 2; 197 } else if (0 == strcmp("-pages", argv[0])) { 198 if (argc < 2) 199 usage(progname); 200 client_pages = strtoul(argv[1], NULL, 0); 201 argc -= 2; argv += 2; 202 } else if (0 == strcmp("-select", argv[0])) { 203 do_select = TRUE; 204 argc--; argv++; 205 } else 206 usage(progname); 207 } 208} 209 210void setup_server_ports(struct port_args *ports) 211{ 212 kern_return_t ret = 0; 213 mach_port_t bsport; 214 215 ports->req_size = MAX(sizeof(ipc_inline_message) + 216 sizeof(u_int32_t) * num_ints, 217 sizeof(ipc_complex_message)); 218 ports->reply_size = sizeof(ipc_trivial_message) - 219 sizeof(mach_msg_trailer_t); 220 ports->req_msg = malloc(ports->req_size); 221 ports->reply_msg = malloc(ports->reply_size); 222 223 ret = mach_port_allocate(mach_task_self(), 224 MACH_PORT_RIGHT_RECEIVE, 225 &(ports->port)); 226 if (KERN_SUCCESS != ret) { 227 mach_error("mach_port_allocate(): ", ret); 228 exit(1); 229 } 230 231 ret = mach_port_allocate(mach_task_self(), 232 MACH_PORT_RIGHT_PORT_SET, 233 &(ports->pset)); 234 if (KERN_SUCCESS != ret) { 235 mach_error("mach_port_allocate(): ", ret); 236 exit(1); 237 } 238 239 ret = mach_port_insert_member(mach_task_self(), 240 ports->port, 241 ports->pset); 242 if (KERN_SUCCESS != ret) { 243 mach_error("mach_port_insert_member(): ", ret); 244 exit(1); 245 } 246 247 ret = mach_port_insert_right(mach_task_self(), 248 ports->port, 249 ports->port, 250 MACH_MSG_TYPE_MAKE_SEND); 251 if (KERN_SUCCESS != ret) { 252 mach_error("mach_port_insert_right(): ", ret); 253 exit(1); 254 } 255 256 ret = task_get_bootstrap_port(mach_task_self(), &bsport); 257 if (KERN_SUCCESS != ret) { 258 mach_error("task_get_bootstrap_port(): ", ret); 259 exit(1); 260 } 261 262 if (verbose) { 263 printf("server waiting for IPC messages from client on port '%s'.\n", 264 server_port_name[ports->server_num]); 265 } 266 ret = bootstrap_register(bsport, 267 server_port_name[ports->server_num], 268 ports->port); 269 if (KERN_SUCCESS != ret) { 270 mach_error("bootstrap_register(): ", ret); 271 exit(1); 272 } 273} 274 275void setup_client_ports(struct port_args *ports) 276{ 277 kern_return_t ret = 0; 278 switch(msg_type) { 279 case msg_type_trivial: 280 ports->req_size = sizeof(ipc_trivial_message); 281 break; 282 case msg_type_inline: 283 ports->req_size = sizeof(ipc_inline_message) + 284 sizeof(u_int32_t) * num_ints; 285 break; 286 case msg_type_complex: 287 ports->req_size = sizeof(ipc_complex_message); 288 break; 289 } 290 ports->req_size -= sizeof(mach_msg_trailer_t); 291 ports->reply_size = sizeof(ipc_trivial_message); 292 ports->req_msg = malloc(ports->req_size); 293 ports->reply_msg = malloc(ports->reply_size); 294 295 ret = mach_port_allocate(mach_task_self(), 296 MACH_PORT_RIGHT_RECEIVE, 297 &(ports->port)); 298 if (KERN_SUCCESS != ret) { 299 mach_error("mach_port_allocate(): ", ret); 300 exit(1); 301 } 302 if (verbose) { 303 printf("Client sending %d %s IPC messages to port '%s' in %s mode.\n", 304 num_msgs, (msg_type == msg_type_inline) ? 305 "inline" : ((msg_type == msg_type_complex) ? 306 "complex" : "trivial"), 307 server_port_name[ports->server_num], 308 (oneway ? "oneway" : "rpc")); 309 } 310 311} 312 313 314static void 315thread_setup(int tag) { 316#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 317 kern_return_t ret; 318 thread_extended_policy_data_t epolicy; 319 thread_affinity_policy_data_t policy; 320 321 if (!timeshare) { 322 epolicy.timeshare = FALSE; 323 ret = thread_policy_set( 324 mach_thread_self(), THREAD_EXTENDED_POLICY, 325 (thread_policy_t) &epolicy, 326 THREAD_EXTENDED_POLICY_COUNT); 327 if (ret != KERN_SUCCESS) 328 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret); 329 } 330 331 if (affinity) { 332 policy.affinity_tag = tag; 333 ret = thread_policy_set( 334 mach_thread_self(), THREAD_AFFINITY_POLICY, 335 (thread_policy_t) &policy, 336 THREAD_AFFINITY_POLICY_COUNT); 337 if (ret != KERN_SUCCESS) 338 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret); 339 } 340#endif 341} 342 343void * 344server(void *serverarg) 345{ 346 int kq; 347 struct kevent64_s kev[1]; 348 int err; 349 int count; 350 struct port_args args; 351 int idx; 352 kern_return_t ret; 353 int totalmsg = num_msgs * num_clients; 354 fd_set readfds; 355 356 args.server_num = (int) (long) serverarg; 357 setup_server_ports(&args); 358 359 thread_setup(args.server_num + 1); 360 361 kq = kqueue(); 362 if (kq == -1) { 363 perror("kqueue"); 364 exit(1); 365 } 366 EV_SET64(&kev[0], args.pset, EVFILT_MACHPORT, (EV_ADD | EV_CLEAR | EV_DISPATCH), 367#if DIRECT_MSG_RCV 368 MACH_RCV_MSG|MACH_RCV_LARGE, 0, 0, (mach_vm_address_t)args.req_msg, args.req_size); 369#else 370 0, 0, 0, 0, 0); 371#endif 372 err = kevent64(kq, kev, 1, NULL, 0, 0, NULL); 373 if (err == -1) { 374 perror("kevent"); 375 exit(1); 376 } 377 378 for (idx = 0; idx < totalmsg; idx++) { 379 380 if (verbose) 381 printf("server awaiting message %d\n", idx); 382 retry: 383 if (do_select) { 384 FD_ZERO(&readfds); 385 FD_SET(kq, &readfds); 386 387 if (verbose) 388 printf("Calling select() prior to kevent64().\n"); 389 390 count = select(kq + 1, &readfds, NULL, NULL, NULL); 391 if (count == -1) { 392 perror("select"); 393 exit(1); 394 } 395 } 396 397 EV_SET64(&kev[0], args.pset, EVFILT_MACHPORT, EV_ENABLE, 398#if DIRECT_MSG_RCV 399 MACH_RCV_MSG|MACH_RCV_LARGE, 0, 0, (mach_vm_address_t)args.req_msg, args.req_size); 400#else 401 0, 0, 0, 0, 0); 402#endif 403 err = kevent64(kq, kev, 1, kev, 1, 0, NULL); 404 if (err == -1) { 405 perror("kevent64"); 406 exit(1); 407 } 408 if (err == 0) { 409 // printf("kevent64: returned zero\n"); 410 goto retry; 411 } 412 413#if DIRECT_MSG_RCV 414 ret = kev[0].fflags; 415 if (MACH_MSG_SUCCESS != ret) { 416 if (verbose) 417 printf("kevent64() mach_msg_return=%d", ret); 418 mach_error("kevent64 (msg receive): ", ret); 419 exit(1); 420 } 421#else 422 if (kev[0].data != args.port) 423 printf("kevent64(MACH_PORT_NULL) port name (%lld) != expected (0x%x)\n", kev[0].data, args.port); 424 425 args.req_msg->msgh_bits = 0; 426 args.req_msg->msgh_size = args.req_size; 427 args.req_msg->msgh_local_port = args.port; 428 ret = mach_msg(args.req_msg, 429 MACH_RCV_MSG|MACH_RCV_INTERRUPT|MACH_RCV_LARGE, 430 0, 431 args.req_size, 432 args.pset, 433 MACH_MSG_TIMEOUT_NONE, 434 MACH_PORT_NULL); 435 if (MACH_RCV_INTERRUPTED == ret) 436 break; 437 if (MACH_MSG_SUCCESS != ret) { 438 if (verbose) 439 printf("mach_msg() ret=%d", ret); 440 mach_error("mach_msg (receive): ", ret); 441 exit(1); 442 } 443#endif 444 if (verbose) 445 printf("server received message %d\n", idx); 446 if (args.req_msg->msgh_bits & MACH_MSGH_BITS_COMPLEX) { 447 ret = vm_deallocate(mach_task_self(), 448 (vm_address_t)((ipc_complex_message *)args.req_msg)->descriptor.address, 449 ((ipc_complex_message *)args.req_msg)->descriptor.size); 450 } 451 452 if (1 == args.req_msg->msgh_id) { 453 if (verbose) 454 printf("server sending reply %d\n", idx); 455 args.reply_msg->msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, 456 MACH_MSG_TYPE_MAKE_SEND); 457 args.reply_msg->msgh_size = args.reply_size; 458 args.reply_msg->msgh_remote_port = args.req_msg->msgh_remote_port; 459 args.reply_msg->msgh_local_port = args.req_msg->msgh_local_port; 460 args.reply_msg->msgh_id = 2; 461 ret = mach_msg(args.reply_msg, 462 MACH_SEND_MSG, 463 args.reply_size, 464 0, 465 MACH_PORT_NULL, 466 MACH_MSG_TIMEOUT_NONE, 467 MACH_PORT_NULL); 468 if (MACH_MSG_SUCCESS != ret) { 469 mach_error("mach_msg (send): ", ret); 470 exit(1); 471 } 472 } 473 } 474 return NULL; 475} 476 477static inline void 478client_spin_loop(unsigned count, void (fn)(void)) 479{ 480 while (count--) 481 fn(); 482} 483 484static long dummy_memory; 485static long *client_memory = &dummy_memory; 486static void 487client_work_atom(void) 488{ 489 static int i; 490 491 if (++i > client_pages * PAGE_SIZE / sizeof(long)) 492 i = 0; 493 client_memory[i] = 0; 494} 495 496static int calibration_count = 10000; 497static int calibration_usec; 498static void * 499calibrate_client_work(void) 500{ 501 long dummy; 502 struct timeval nowtv; 503 struct timeval warmuptv = { 0, 100 * 1000 }; /* 100ms */ 504 struct timeval starttv; 505 struct timeval endtv; 506 507 if (client_spin) { 508 /* Warm-up the stepper first... */ 509 gettimeofday(&nowtv, NULL); 510 timeradd(&nowtv, &warmuptv, &endtv); 511 do { 512 client_spin_loop(calibration_count, client_work_atom); 513 gettimeofday(&nowtv, NULL); 514 } while (timercmp(&nowtv, &endtv, < )); 515 516 /* Now do the calibration */ 517 while (TRUE) { 518 gettimeofday(&starttv, NULL); 519 client_spin_loop(calibration_count, client_work_atom); 520 gettimeofday(&endtv, NULL); 521 if (endtv.tv_sec - starttv.tv_sec > 1) { 522 calibration_count /= 10; 523 continue; 524 } 525 calibration_usec = endtv.tv_usec - starttv.tv_usec; 526 if (endtv.tv_usec < starttv.tv_usec) { 527 calibration_usec += 1000000; 528 } 529 if (calibration_usec < 1000) { 530 calibration_count *= 10; 531 continue; 532 } 533 calibration_count /= calibration_usec; 534 break; 535 } 536 if (verbose) 537 printf("calibration_count=%d calibration_usec=%d\n", 538 calibration_count, calibration_usec); 539 } 540 return NULL; 541} 542 543static void * 544client_work(void) 545{ 546 547 if (client_spin) { 548 client_spin_loop(calibration_count*client_spin, 549 client_work_atom); 550 } 551 552 if (client_delay) { 553 usleep(client_delay); 554 } 555 return NULL; 556} 557 558void *client(void *threadarg) 559{ 560 struct port_args args; 561 int idx; 562 mach_msg_header_t *req, *reply; 563 mach_port_t bsport, servport; 564 kern_return_t ret; 565 int server_num = (int) threadarg; 566 void *ints = malloc(sizeof(u_int32_t) * num_ints); 567 568 if (verbose) 569 printf("client(%d) started, server port name %s\n", 570 server_num, server_port_name[server_num]); 571 572 args.server_num = server_num; 573 thread_setup(server_num + 1); 574 575 /* find server port */ 576 ret = task_get_bootstrap_port(mach_task_self(), &bsport); 577 if (KERN_SUCCESS != ret) { 578 mach_error("task_get_bootstrap_port(): ", ret); 579 exit(1); 580 } 581 ret = bootstrap_look_up(bsport, 582 server_port_name[server_num], 583 &servport); 584 if (KERN_SUCCESS != ret) { 585 mach_error("bootstrap_look_up(): ", ret); 586 exit(1); 587 } 588 589 setup_client_ports(&args); 590 591 /* Allocate and touch memory */ 592 if (client_pages) { 593 unsigned i; 594 client_memory = (long *) malloc(client_pages * PAGE_SIZE); 595 for (i = 0; i < client_pages; i++) 596 client_memory[i * PAGE_SIZE / sizeof(long)] = 0; 597 } 598 599 /* start message loop */ 600 for (idx = 0; idx < num_msgs; idx++) { 601 req = args.req_msg; 602 reply = args.reply_msg; 603 604 req->msgh_bits = MACH_MSGH_BITS(MACH_MSG_TYPE_COPY_SEND, 605 MACH_MSG_TYPE_MAKE_SEND); 606 req->msgh_size = args.req_size; 607 req->msgh_remote_port = servport; 608 req->msgh_local_port = args.port; 609 req->msgh_id = oneway ? 0 : 1; 610 if (msg_type == msg_type_complex) { 611 (req)->msgh_bits |= MACH_MSGH_BITS_COMPLEX; 612 ((ipc_complex_message *)req)->body.msgh_descriptor_count = 1; 613 ((ipc_complex_message *)req)->descriptor.address = ints; 614 ((ipc_complex_message *)req)->descriptor.size = 615 num_ints * sizeof(u_int32_t); 616 ((ipc_complex_message *)req)->descriptor.deallocate = FALSE; 617 ((ipc_complex_message *)req)->descriptor.copy = MACH_MSG_VIRTUAL_COPY; 618 ((ipc_complex_message *)req)->descriptor.type = MACH_MSG_OOL_DESCRIPTOR; 619 } 620 if (verbose) 621 printf("client sending message %d\n", idx); 622 ret = mach_msg(req, 623 MACH_SEND_MSG, 624 args.req_size, 625 0, 626 MACH_PORT_NULL, 627 MACH_MSG_TIMEOUT_NONE, 628 MACH_PORT_NULL); 629 if (MACH_MSG_SUCCESS != ret) { 630 mach_error("mach_msg (send): ", ret); 631 fprintf(stderr, "bailing after %u iterations\n", idx); 632 exit(1); 633 break; 634 } 635 if (!oneway) { 636 if (verbose) 637 printf("client awaiting reply %d\n", idx); 638 reply->msgh_bits = 0; 639 reply->msgh_size = args.reply_size; 640 reply->msgh_local_port = args.port; 641 ret = mach_msg(args.reply_msg, 642 MACH_RCV_MSG|MACH_RCV_INTERRUPT, 643 0, 644 args.reply_size, 645 args.port, 646 MACH_MSG_TIMEOUT_NONE, 647 MACH_PORT_NULL); 648 if (MACH_MSG_SUCCESS != ret) { 649 mach_error("mach_msg (receive): ", ret); 650 fprintf(stderr, "bailing after %u iterations\n", 651 idx); 652 exit(1); 653 } 654 if (verbose) 655 printf("client received reply %d\n", idx); 656 } 657 658 client_work(); 659 } 660 661 free(ints); 662 return NULL; 663} 664 665static void 666thread_spawn(thread_id_t *thread, void *(fn)(void *), void *arg) { 667 if (threaded) { 668 kern_return_t ret; 669 ret = pthread_create( 670 &thread->tid, 671 NULL, 672 fn, 673 arg); 674 if (ret != 0) 675 err(1, "pthread_create()"); 676 if (verbose) 677 printf("created pthread %p\n", thread->tid); 678 } else { 679 thread->pid = fork(); 680 if (thread->pid == 0) { 681 if (verbose) 682 printf("calling %p(%p)\n", fn, arg); 683 fn(arg); 684 exit(0); 685 } 686 if (verbose) 687 printf("forked pid %d\n", thread->pid); 688 } 689} 690 691static void 692thread_join(thread_id_t *thread) { 693 if (threaded) { 694 kern_return_t ret; 695 if (verbose) 696 printf("joining thread %p\n", thread->tid); 697 ret = pthread_join(thread->tid, NULL); 698 if (ret != KERN_SUCCESS) 699 err(1, "pthread_join(%p)", thread->tid); 700 } else { 701 int stat; 702 if (verbose) 703 printf("waiting for pid %d\n", thread->pid); 704 waitpid(thread->pid, &stat, 0); 705 } 706} 707 708static void 709wait_for_servers(void) 710{ 711 int i; 712 int retry_count = 10; 713 mach_port_t bsport, servport; 714 kern_return_t ret; 715 716 /* find server port */ 717 ret = task_get_bootstrap_port(mach_task_self(), &bsport); 718 if (KERN_SUCCESS != ret) { 719 mach_error("task_get_bootstrap_port(): ", ret); 720 exit(1); 721 } 722 723 while (retry_count-- > 0) { 724 for (i = 0; i < num_servers; i++) { 725 ret = bootstrap_look_up(bsport, 726 server_port_name[i], 727 &servport); 728 if (ret != KERN_SUCCESS) { 729 break; 730 } 731 } 732 if (ret == KERN_SUCCESS) 733 return; 734 usleep(100 * 1000); /* 100ms */ 735 } 736 fprintf(stderr, "Server(s) failed to register\n"); 737 exit(1); 738} 739 740int main(int argc, char *argv[]) 741{ 742 int i; 743 int j; 744 thread_id_t *client_id; 745 thread_id_t *server_id; 746 747 signal(SIGINT, signal_handler); 748 parse_args(argc, argv); 749 750 calibrate_client_work(); 751 752 /* 753 * If we're using affinity create an empty namespace now 754 * so this is shared by all our offspring. 755 */ 756 if (affinity) 757 thread_setup(0); 758 759 server_id = (thread_id_t *) malloc(num_servers * sizeof(thread_id_t)); 760 server_port_name = (char **) malloc(num_servers * sizeof(char *)); 761 if (verbose) 762 printf("creating %d servers\n", num_servers); 763 for (i = 0; i < num_servers; i++) { 764 server_port_name[i] = (char *) malloc(sizeof("PORT.pppppp.xx")); 765 /* PORT names include pid of main process for disambiguation */ 766 sprintf(server_port_name[i], "PORT.%06d.%02d", getpid(), i); 767 thread_spawn(&server_id[i], server, (void *) (long) i); 768 } 769 770 int totalclients = num_servers * num_clients; 771 int totalmsg = num_msgs * totalclients; 772 struct timeval starttv, endtv, deltatv; 773 774 /* 775 * Wait for all servers to have registered all ports before starting 776 * the clients and the clock. 777 */ 778 wait_for_servers(); 779 780 printf("%d server%s, %d client%s per server (%d total) %u messages...", 781 num_servers, (num_servers > 1)? "s" : "", 782 num_clients, (num_clients > 1)? "s" : "", 783 totalclients, 784 totalmsg); 785 fflush(stdout); 786 787 /* Call gettimeofday() once and throw away result; some implementations 788 * (like Mach's) cache some time zone info on first call. 789 */ 790 gettimeofday(&starttv, NULL); 791 gettimeofday(&starttv, NULL); 792 793 client_id = (thread_id_t *) malloc(totalclients * sizeof(thread_id_t)); 794 if (verbose) 795 printf("creating %d clients\n", totalclients); 796 for (i = 0; i < num_servers; i++) { 797 for (j = 0; j < num_clients; j++) { 798 thread_spawn( 799 &client_id[(i*num_clients) + j], 800 client, 801 (void *) (long) i); 802 } 803 } 804 805 /* Wait for servers to complete */ 806 for (i = 0; i < num_servers; i++) { 807 thread_join(&server_id[i]); 808 } 809 810 gettimeofday(&endtv, NULL); 811 812 for (i = 0; i < totalclients; i++) { 813 thread_join(&client_id[i]); 814 } 815 816 /* report results */ 817 deltatv.tv_sec = endtv.tv_sec - starttv.tv_sec; 818 deltatv.tv_usec = endtv.tv_usec - starttv.tv_usec; 819 if (endtv.tv_usec < starttv.tv_usec) { 820 deltatv.tv_sec--; 821 deltatv.tv_usec += 1000000; 822 } 823 824 double dsecs = (double) deltatv.tv_sec + 825 1.0E-6 * (double) deltatv.tv_usec; 826 827 printf(" in %ld.%03u seconds\n", 828 (long)deltatv.tv_sec, deltatv.tv_usec/1000); 829 printf(" throughput in messages/sec: %g\n", 830 (double)totalmsg / dsecs); 831 printf(" average message latency (usec): %2.3g\n", 832 dsecs * 1.0E6 / (double) totalmsg); 833 834 return (0); 835 836} 837