1#include <AvailabilityMacros.h> 2#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 3#include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h> 4#endif 5#include <mach/mach.h> 6#include <mach/mach_error.h> 7#include <mach/mach_time.h> 8#include <pthread.h> 9#include <sys/queue.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <unistd.h> 14#include <err.h> 15 16/* 17 * Pool is another multithreaded test/benchmarking program to evaluate 18 * affinity set placement in Leopard. 19 * 20 * The basic picture is: 21 * 22 * -> producer -- -> consumer -- 23 * free / \ work / \ 24 * -> queue -- ... --> queue -- -- 25 * | \ / \ / | 26 * | -> producer -- -> consumer -- | 27 * --------------------------------------------------------------- 28 * 29 * <---------- "stage" ---------> <---------- "stage" ---------> 30 * 31 * There are a series of work stages. Each stage has an input and an output 32 * queue and multiple threads. The first stage is the producer and subsequent 33 * stages are consumers. By defuaut there are 2 stages. There are N producer 34 * and M consumer threads. The are B buffers per producer threads circulating 35 * through the system. 36 * 37 * When affinity is enabled, each producer thread is tagged with an affinity tag 38 * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to 39 * the work queue it is tagged with this affinity. When a consumer dequeues a 40 * work item, it sets its affinity to this tag. Hence consumer threads migrate 41 * to the same affinity set where the data was produced. 42 * 43 * Buffer management uses pthread mutex/condition variables. A thread blocks 44 * when no buffer is available on a queue and it is signaled when a buffer 45 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>. 46 * The queue management is centralized in a single routine: what queues to 47 * use as input and output and what function to call for processing is 48 * data-driven. 49 */ 50 51pthread_mutex_t funnel; 52pthread_cond_t barrier; 53 54uint64_t timer; 55int threads; 56int threads_ready = 0; 57 58int iterations = 10000; 59boolean_t affinity = FALSE; 60boolean_t halting = FALSE; 61int verbosity = 1; 62 63typedef struct work { 64 TAILQ_ENTRY(work) link; 65 int *data; 66 int isize; 67 int tag; 68 int number; 69} work_t; 70 71/* 72 * A work queue, complete with pthread objects for its management 73 */ 74typedef struct work_queue { 75 pthread_mutex_t mtx; 76 pthread_cond_t cnd; 77 TAILQ_HEAD(, work) queue; 78 unsigned int waiters; 79} work_queue_t; 80 81/* Worker functions take a integer array and size */ 82typedef void (worker_fn_t)(int *, int); 83 84/* This struct controls the function of a stage */ 85#define WORKERS_MAX 10 86typedef struct { 87 int stagenum; 88 char *name; 89 worker_fn_t *fn; 90 work_queue_t *input; 91 work_queue_t *output; 92 work_queue_t bufq; 93 int work_todo; 94} stage_info_t; 95 96/* This defines a worker thread */ 97typedef struct worker_info { 98 int setnum; 99 stage_info_t *stage; 100 pthread_t thread; 101} worker_info_t; 102 103#define DBG(x...) do { \ 104 if (verbosity > 1) { \ 105 pthread_mutex_lock(&funnel); \ 106 printf(x); \ 107 pthread_mutex_unlock(&funnel); \ 108 } \ 109} while (0) 110 111#define mutter(x...) do { \ 112 if (verbosity > 0) { \ 113 printf(x); \ 114 } \ 115} while (0) 116 117#define s_if_plural(x) (((x) > 1) ? "s" : "") 118 119static void 120usage() 121{ 122 fprintf(stderr, 123#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 124 "usage: pool [-a] Turn affinity on (off)\n" 125 " [-b B] Number of buffers per producer (2)\n" 126#else 127 "usage: pool [-b B] Number of buffers per producer (2)\n" 128#endif 129 " [-i I] Number of buffers to produce (10000)\n" 130 " [-s S] Number of stages (2)\n" 131 " [-p P] Number of pages per buffer (256=1MB)]\n" 132 " [-w] Consumer writes data\n" 133 " [-v V] Verbosity level 0..2 (1)\n" 134 " [N [M]] Number of producer and consumers (2)\n" 135 ); 136 exit(1); 137} 138 139/* Trivial producer: write to each byte */ 140void 141writer_fn(int *data, int isize) 142{ 143 int i; 144 145 for (i = 0; i < isize; i++) { 146 data[i] = i; 147 } 148} 149 150/* Trivial consumer: read each byte */ 151void 152reader_fn(int *data, int isize) 153{ 154 int i; 155 int datum; 156 157 for (i = 0; i < isize; i++) { 158 datum = data[i]; 159 } 160} 161 162/* Consumer reading and writing the buffer */ 163void 164reader_writer_fn(int *data, int isize) 165{ 166 int i; 167 168 for (i = 0; i < isize; i++) { 169 data[i] += 1; 170 } 171} 172 173void 174affinity_set(int tag) 175{ 176#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 177 kern_return_t ret; 178 thread_affinity_policy_data_t policy; 179 if (affinity) { 180 policy.affinity_tag = tag; 181 ret = thread_policy_set( 182 mach_thread_self(), THREAD_AFFINITY_POLICY, 183 (thread_policy_t) &policy, 184 THREAD_AFFINITY_POLICY_COUNT); 185 if (ret != KERN_SUCCESS) 186 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret); 187 } 188#endif 189} 190 191/* 192 * This is the central function for every thread. 193 * For each invocation, its role is ets by (a pointer to) a stage_info_t. 194 */ 195void * 196manager_fn(void *arg) 197{ 198 worker_info_t *wp = (worker_info_t *) arg; 199 stage_info_t *sp = wp->stage; 200 boolean_t is_producer = (sp->stagenum == 0); 201 long iteration = 0; 202 int current_tag = 0; 203 204#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 205 kern_return_t ret; 206 thread_extended_policy_data_t epolicy; 207 epolicy.timeshare = FALSE; 208 ret = thread_policy_set( 209 mach_thread_self(), THREAD_EXTENDED_POLICY, 210 (thread_policy_t) &epolicy, 211 THREAD_EXTENDED_POLICY_COUNT); 212 if (ret != KERN_SUCCESS) 213 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret); 214 215#endif 216 /* 217 * If we're using affinity sets and we're a producer 218 * set our tag to by our thread set number. 219 */ 220 if (affinity && is_producer) { 221 affinity_set(wp->setnum); 222 current_tag = wp->setnum; 223 } 224 225 DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum); 226 227 /* 228 * Start barrier. 229 * The tets thread to get here releases everyone and starts the timer. 230 */ 231 pthread_mutex_lock(&funnel); 232 threads_ready++; 233 if (threads_ready == threads) { 234 pthread_mutex_unlock(&funnel); 235 if (halting) { 236 printf(" all threads ready for process %d, " 237 "hit any key to start", getpid()); 238 fflush(stdout); 239 (void) getchar(); 240 } 241 pthread_cond_broadcast(&barrier); 242 timer = mach_absolute_time(); 243 } else { 244 pthread_cond_wait(&barrier, &funnel); 245 pthread_mutex_unlock(&funnel); 246 } 247 248 do { 249 work_t *workp; 250 251 /* 252 * Get a buffer from the input queue. 253 * Block if none. 254 * Quit if all work done. 255 */ 256 pthread_mutex_lock(&sp->input->mtx); 257 while (1) { 258 if (sp->work_todo == 0) { 259 pthread_mutex_unlock(&sp->input->mtx); 260 goto out; 261 } 262 workp = TAILQ_FIRST(&(sp->input->queue)); 263 if (workp != NULL) 264 break; 265 DBG(" %s[%d,%d] todo %d waiting for buffer\n", 266 sp->name, wp->setnum, sp->stagenum, sp->work_todo); 267 sp->input->waiters++; 268 pthread_cond_wait(&sp->input->cnd, &sp->input->mtx); 269 sp->input->waiters--; 270 } 271 TAILQ_REMOVE(&(sp->input->queue), workp, link); 272 iteration = sp->work_todo--; 273 pthread_mutex_unlock(&sp->input->mtx); 274 275 if (is_producer) { 276 workp->number = iteration; 277 workp->tag = wp->setnum; 278 } else { 279 if (affinity && current_tag != workp->tag) { 280 affinity_set(workp->tag); 281 current_tag = workp->tag; 282 } 283 } 284 285 DBG(" %s[%d,%d] todo %d work %p data %p\n", 286 sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data); 287 288 /* Do our stuff with the buffer */ 289 (void) sp->fn(workp->data, workp->isize); 290 291 /* 292 * Place the buffer on the input queue of the next stage. 293 * Signal waiters if required. 294 */ 295 pthread_mutex_lock(&sp->output->mtx); 296 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link); 297 if (sp->output->waiters) { 298 DBG(" %s[%d,%d] todo %d signaling work\n", 299 sp->name, wp->setnum, sp->stagenum, iteration); 300 pthread_cond_signal(&sp->output->cnd); 301 } 302 pthread_mutex_unlock(&sp->output->mtx); 303 304 } while (1); 305 306out: 307 pthread_cond_broadcast(&sp->output->cnd); 308 309 DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum); 310 311 return (void *) iteration; 312} 313 314void (*producer_fnp)(int *data, int isize) = &writer_fn; 315void (*consumer_fnp)(int *data, int isize) = &reader_fn; 316 317int 318main(int argc, char *argv[]) 319{ 320 int i; 321 int j; 322 int k; 323 int pages = 256; /* 1MB */ 324 int buffers = 2; 325 int producers = 2; 326 int consumers = 2; 327 int stages = 2; 328 int *status; 329 stage_info_t *stage_info; 330 stage_info_t *sp; 331 worker_info_t *worker_info; 332 worker_info_t *wp; 333 kern_return_t ret; 334 int c; 335 336 /* Do switch parsing: */ 337 while ((c = getopt (argc, argv, "ab:i:p:s:twv:")) != -1) { 338 switch (c) { 339 case 'a': 340#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 341 affinity = !affinity; 342 break; 343#else 344 usage(); 345#endif 346 case 'b': 347 buffers = atoi(optarg); 348 break; 349 case 'i': 350 iterations = atoi(optarg); 351 break; 352 case 'p': 353 pages = atoi(optarg); 354 break; 355 case 's': 356 stages = atoi(optarg); 357 if (stages >= WORKERS_MAX) 358 usage(); 359 break; 360 case 't': 361 halting = TRUE; 362 break; 363 case 'w': 364 consumer_fnp = &reader_writer_fn; 365 break; 366 case 'v': 367 verbosity = atoi(optarg); 368 break; 369 case 'h': 370 case '?': 371 default: 372 usage(); 373 } 374 } 375 argc -= optind; argv += optind; 376 if (argc > 0) 377 producers = atoi(*argv); 378 argc--; argv++; 379 if (argc > 0) 380 consumers = atoi(*argv); 381 382 pthread_mutex_init(&funnel, NULL); 383 pthread_cond_init(&barrier, NULL); 384 385 /* 386 * Fire up the worker threads. 387 */ 388 threads = consumers * (stages - 1) + producers; 389 mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n" 390 " with %saffinity, consumer reads%s data\n", 391 producers, s_if_plural(producers), 392 stages - 1, s_if_plural(stages - 1), 393 consumers, s_if_plural(consumers), 394 affinity? "": "no ", 395 (consumer_fnp == &reader_writer_fn)? " and writes" : ""); 396 if (pages < 256) 397 mutter(" %dkB bytes per buffer, ", pages * 4); 398 else 399 mutter(" %dMB bytes per buffer, ", pages / 256); 400 mutter("%d buffer%s per producer ", 401 buffers, s_if_plural(buffers)); 402 if (buffers * pages < 256) 403 mutter("(total %dkB)\n", buffers * pages * 4); 404 else 405 mutter("(total %dMB)\n", buffers * pages / 256); 406 mutter(" processing %d buffer%s...\n", 407 iterations, s_if_plural(iterations)); 408 409 stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t)); 410 worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t)); 411 412 /* Set up the queue for the workers of this thread set: */ 413 for (i = 0; i < stages; i++) { 414 sp = &stage_info[i]; 415 sp->stagenum = i; 416 pthread_mutex_init(&sp->bufq.mtx, NULL); 417 pthread_cond_init(&sp->bufq.cnd, NULL); 418 TAILQ_INIT(&sp->bufq.queue); 419 sp->bufq.waiters = 0; 420 if (i == 0) { 421 sp->fn = producer_fnp; 422 sp->name = "producer"; 423 } else { 424 sp->fn = consumer_fnp; 425 sp->name = "consumer"; 426 } 427 sp->input = &sp->bufq; 428 sp->output = &stage_info[(i + 1) % stages].bufq; 429 stage_info[i].work_todo = iterations; 430 } 431 432 /* Create the producers */ 433 for (i = 0; i < producers; i++) { 434 work_t *work_array; 435 int *data; 436 int isize; 437 438 isize = pages * 4096 / sizeof(int); 439 data = (int *) malloc(buffers * pages * 4096); 440 441 /* Set up the empty work buffers */ 442 work_array = (work_t *) malloc(buffers * sizeof(work_t)); 443 for (j = 0; j < buffers; j++) { 444 work_array[j].data = data + (isize * j); 445 work_array[j].isize = isize; 446 work_array[j].tag = 0; 447 TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link); 448 DBG(" empty work item %p for data %p\n", 449 &work_array[j], work_array[j].data); 450 } 451 wp = &worker_info[i]; 452 wp->setnum = i + 1; 453 wp->stage = &stage_info[0]; 454 if (ret = pthread_create(&wp->thread, 455 NULL, 456 &manager_fn, 457 (void *) wp)) 458 err(1, "pthread_create %d,%d", 0, i); 459 } 460 461 /* Create consumers */ 462 for (i = 1; i < stages; i++) { 463 for (j = 0; j < consumers; j++) { 464 wp = &worker_info[producers + (consumers*(i-1)) + j]; 465 wp->setnum = j + 1; 466 wp->stage = &stage_info[i]; 467 if (ret = pthread_create(&wp->thread, 468 NULL, 469 &manager_fn, 470 (void *) wp)) 471 err(1, "pthread_create %d,%d", i, j); 472 } 473 } 474 475 /* 476 * We sit back anf wait for the slaves to finish. 477 */ 478 for (k = 0; k < threads; k++) { 479 int i; 480 int j; 481 482 wp = &worker_info[k]; 483 if (k < producers) { 484 i = 0; 485 j = k; 486 } else { 487 i = (k - producers) / consumers; 488 j = (k - producers) % consumers; 489 } 490 if(ret = pthread_join(wp->thread, (void **)&status)) 491 err(1, "pthread_join %d,%d", i, j); 492 DBG("Thread %d,%d status %d\n", i, j, status); 493 } 494 495 /* 496 * See how long the work took. 497 */ 498 timer = mach_absolute_time() - timer; 499 timer = timer / 1000000ULL; 500 printf("%d.%03d seconds elapsed.\n", 501 (int) (timer/1000ULL), (int) (timer % 1000ULL)); 502 503 return 0; 504} 505