1#include <AvailabilityMacros.h> 2#include <mach/thread_policy.h> 3#include <mach/mach.h> 4#include <mach/mach_error.h> 5#include <mach/mach_time.h> 6#include <pthread.h> 7#include <sys/queue.h> 8#include <stdio.h> 9#include <stdlib.h> 10#include <string.h> 11#include <unistd.h> 12#include <err.h> 13 14/* 15 * Pool is another multithreaded test/benchmarking program to evaluate 16 * affinity set placement in Leopard. 17 * 18 * The basic picture is: 19 * 20 * -> producer -- -> consumer -- 21 * free / \ work / \ 22 * -> queue -- ... --> queue -- -- 23 * | \ / \ / | 24 * | -> producer -- -> consumer -- | 25 * --------------------------------------------------------------- 26 * 27 * <---------- "stage" ---------> <---------- "stage" ---------> 28 * 29 * There are a series of work stages. Each stage has an input and an output 30 * queue and multiple threads. The first stage is the producer and subsequent 31 * stages are consumers. By defuaut there are 2 stages. There are N producer 32 * and M consumer threads. The are B buffers per producer threads circulating 33 * through the system. 34 * 35 * When affinity is enabled, each producer thread is tagged with an affinity tag 36 * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to 37 * the work queue it is tagged with this affinity. When a consumer dequeues a 38 * work item, it sets its affinity to this tag. Hence consumer threads migrate 39 * to the same affinity set where the data was produced. 40 * 41 * Buffer management uses pthread mutex/condition variables. A thread blocks 42 * when no buffer is available on a queue and it is signaled when a buffer 43 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>. 44 * The queue management is centralized in a single routine: what queues to 45 * use as input and output and what function to call for processing is 46 * data-driven. 47 */ 48 49pthread_mutex_t funnel; 50pthread_cond_t barrier; 51 52uint64_t timer; 53int threads; 54int threads_ready = 0; 55 56int iterations = 10000; 57boolean_t affinity = FALSE; 58boolean_t halting = FALSE; 59int verbosity = 1; 60 61typedef struct work { 62 TAILQ_ENTRY(work) link; 63 int *data; 64 int isize; 65 int tag; 66 int number; 67} work_t; 68 69/* 70 * A work queue, complete with pthread objects for its management 71 */ 72typedef struct work_queue { 73 pthread_mutex_t mtx; 74 pthread_cond_t cnd; 75 TAILQ_HEAD(, work) queue; 76 unsigned int waiters; 77} work_queue_t; 78 79/* Worker functions take a integer array and size */ 80typedef void (worker_fn_t)(int *, int); 81 82/* This struct controls the function of a stage */ 83#define WORKERS_MAX 10 84typedef struct { 85 int stagenum; 86 char *name; 87 worker_fn_t *fn; 88 work_queue_t *input; 89 work_queue_t *output; 90 work_queue_t bufq; 91 int work_todo; 92} stage_info_t; 93 94/* This defines a worker thread */ 95typedef struct worker_info { 96 int setnum; 97 stage_info_t *stage; 98 pthread_t thread; 99} worker_info_t; 100 101#define DBG(x...) do { \ 102 if (verbosity > 1) { \ 103 pthread_mutex_lock(&funnel); \ 104 printf(x); \ 105 pthread_mutex_unlock(&funnel); \ 106 } \ 107} while (0) 108 109#define mutter(x...) do { \ 110 if (verbosity > 0) { \ 111 printf(x); \ 112 } \ 113} while (0) 114 115#define s_if_plural(x) (((x) > 1) ? "s" : "") 116 117static void 118usage() 119{ 120 fprintf(stderr, 121 "usage: pool [-a] Turn affinity on (off)\n" 122 " [-b B] Number of buffers per producer (2)\n" 123 " [-i I] Number of buffers to produce (10000)\n" 124 " [-s S] Number of stages (2)\n" 125 " [-p P] Number of pages per buffer (256=1MB)]\n" 126 " [-w] Consumer writes data\n" 127 " [-v V] Verbosity level 0..2 (1)\n" 128 " [N [M]] Number of producer and consumers (2)\n" 129 ); 130 exit(1); 131} 132 133/* Trivial producer: write to each byte */ 134void 135writer_fn(int *data, int isize) 136{ 137 int i; 138 139 for (i = 0; i < isize; i++) { 140 data[i] = i; 141 } 142} 143 144/* Trivial consumer: read each byte */ 145void 146reader_fn(int *data, int isize) 147{ 148 int i; 149 int datum; 150 151 for (i = 0; i < isize; i++) { 152 datum = data[i]; 153 } 154} 155 156/* Consumer reading and writing the buffer */ 157void 158reader_writer_fn(int *data, int isize) 159{ 160 int i; 161 162 for (i = 0; i < isize; i++) { 163 data[i] += 1; 164 } 165} 166 167void 168affinity_set(int tag) 169{ 170 kern_return_t ret; 171 thread_affinity_policy_data_t policy; 172 if (affinity) { 173 policy.affinity_tag = tag; 174 ret = thread_policy_set( 175 mach_thread_self(), THREAD_AFFINITY_POLICY, 176 (thread_policy_t) &policy, 177 THREAD_AFFINITY_POLICY_COUNT); 178 if (ret != KERN_SUCCESS) 179 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret); 180 } 181} 182 183/* 184 * This is the central function for every thread. 185 * For each invocation, its role is ets by (a pointer to) a stage_info_t. 186 */ 187void * 188manager_fn(void *arg) 189{ 190 worker_info_t *wp = (worker_info_t *) arg; 191 stage_info_t *sp = wp->stage; 192 boolean_t is_producer = (sp->stagenum == 0); 193 long iteration = 0; 194 int current_tag = 0; 195 196 kern_return_t ret; 197 thread_extended_policy_data_t epolicy; 198 epolicy.timeshare = FALSE; 199 ret = thread_policy_set( 200 mach_thread_self(), THREAD_EXTENDED_POLICY, 201 (thread_policy_t) &epolicy, 202 THREAD_EXTENDED_POLICY_COUNT); 203 if (ret != KERN_SUCCESS) 204 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret); 205 206 /* 207 * If we're using affinity sets and we're a producer 208 * set our tag to by our thread set number. 209 */ 210 if (affinity && is_producer) { 211 affinity_set(wp->setnum); 212 current_tag = wp->setnum; 213 } 214 215 DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum); 216 217 /* 218 * Start barrier. 219 * The tets thread to get here releases everyone and starts the timer. 220 */ 221 pthread_mutex_lock(&funnel); 222 threads_ready++; 223 if (threads_ready == threads) { 224 pthread_mutex_unlock(&funnel); 225 if (halting) { 226 printf(" all threads ready for process %d, " 227 "hit any key to start", getpid()); 228 fflush(stdout); 229 (void) getchar(); 230 } 231 pthread_cond_broadcast(&barrier); 232 timer = mach_absolute_time(); 233 } else { 234 pthread_cond_wait(&barrier, &funnel); 235 pthread_mutex_unlock(&funnel); 236 } 237 238 do { 239 work_t *workp; 240 241 /* 242 * Get a buffer from the input queue. 243 * Block if none. 244 * Quit if all work done. 245 */ 246 pthread_mutex_lock(&sp->input->mtx); 247 while (1) { 248 if (sp->work_todo == 0) { 249 pthread_mutex_unlock(&sp->input->mtx); 250 goto out; 251 } 252 workp = TAILQ_FIRST(&(sp->input->queue)); 253 if (workp != NULL) 254 break; 255 DBG(" %s[%d,%d] todo %d waiting for buffer\n", 256 sp->name, wp->setnum, sp->stagenum, sp->work_todo); 257 sp->input->waiters++; 258 pthread_cond_wait(&sp->input->cnd, &sp->input->mtx); 259 sp->input->waiters--; 260 } 261 TAILQ_REMOVE(&(sp->input->queue), workp, link); 262 iteration = sp->work_todo--; 263 pthread_mutex_unlock(&sp->input->mtx); 264 265 if (is_producer) { 266 workp->number = iteration; 267 workp->tag = wp->setnum; 268 } else { 269 if (affinity && current_tag != workp->tag) { 270 affinity_set(workp->tag); 271 current_tag = workp->tag; 272 } 273 } 274 275 DBG(" %s[%d,%d] todo %d work %p data %p\n", 276 sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data); 277 278 /* Do our stuff with the buffer */ 279 (void) sp->fn(workp->data, workp->isize); 280 281 /* 282 * Place the buffer on the input queue of the next stage. 283 * Signal waiters if required. 284 */ 285 pthread_mutex_lock(&sp->output->mtx); 286 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link); 287 if (sp->output->waiters) { 288 DBG(" %s[%d,%d] todo %d signaling work\n", 289 sp->name, wp->setnum, sp->stagenum, iteration); 290 pthread_cond_signal(&sp->output->cnd); 291 } 292 pthread_mutex_unlock(&sp->output->mtx); 293 294 } while (1); 295 296out: 297 pthread_cond_broadcast(&sp->output->cnd); 298 299 DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum); 300 301 return (void *) iteration; 302} 303 304void (*producer_fnp)(int *data, int isize) = &writer_fn; 305void (*consumer_fnp)(int *data, int isize) = &reader_fn; 306 307int 308main(int argc, char *argv[]) 309{ 310 int i; 311 int j; 312 int k; 313 int pages = 256; /* 1MB */ 314 int buffers = 2; 315 int producers = 2; 316 int consumers = 2; 317 int stages = 2; 318 int *status; 319 stage_info_t *stage_info; 320 stage_info_t *sp; 321 worker_info_t *worker_info; 322 worker_info_t *wp; 323 kern_return_t ret; 324 int c; 325 326 /* Do switch parsing: */ 327 while ((c = getopt (argc, argv, "ab:i:p:s:twv:")) != -1) { 328 switch (c) { 329 case 'a': 330 affinity = !affinity; 331 break; 332 case 'b': 333 buffers = atoi(optarg); 334 break; 335 case 'i': 336 iterations = atoi(optarg); 337 break; 338 case 'p': 339 pages = atoi(optarg); 340 break; 341 case 's': 342 stages = atoi(optarg); 343 if (stages >= WORKERS_MAX) 344 usage(); 345 break; 346 case 't': 347 halting = TRUE; 348 break; 349 case 'w': 350 consumer_fnp = &reader_writer_fn; 351 break; 352 case 'v': 353 verbosity = atoi(optarg); 354 break; 355 case 'h': 356 case '?': 357 default: 358 usage(); 359 } 360 } 361 argc -= optind; argv += optind; 362 if (argc > 0) 363 producers = atoi(*argv); 364 argc--; argv++; 365 if (argc > 0) 366 consumers = atoi(*argv); 367 368 pthread_mutex_init(&funnel, NULL); 369 pthread_cond_init(&barrier, NULL); 370 371 /* 372 * Fire up the worker threads. 373 */ 374 threads = consumers * (stages - 1) + producers; 375 mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n" 376 " with %saffinity, consumer reads%s data\n", 377 producers, s_if_plural(producers), 378 stages - 1, s_if_plural(stages - 1), 379 consumers, s_if_plural(consumers), 380 affinity? "": "no ", 381 (consumer_fnp == &reader_writer_fn)? " and writes" : ""); 382 if (pages < 256) 383 mutter(" %dkB bytes per buffer, ", pages * 4); 384 else 385 mutter(" %dMB bytes per buffer, ", pages / 256); 386 mutter("%d buffer%s per producer ", 387 buffers, s_if_plural(buffers)); 388 if (buffers * pages < 256) 389 mutter("(total %dkB)\n", buffers * pages * 4); 390 else 391 mutter("(total %dMB)\n", buffers * pages / 256); 392 mutter(" processing %d buffer%s...\n", 393 iterations, s_if_plural(iterations)); 394 395 stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t)); 396 worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t)); 397 398 /* Set up the queue for the workers of this thread set: */ 399 for (i = 0; i < stages; i++) { 400 sp = &stage_info[i]; 401 sp->stagenum = i; 402 pthread_mutex_init(&sp->bufq.mtx, NULL); 403 pthread_cond_init(&sp->bufq.cnd, NULL); 404 TAILQ_INIT(&sp->bufq.queue); 405 sp->bufq.waiters = 0; 406 if (i == 0) { 407 sp->fn = producer_fnp; 408 sp->name = "producer"; 409 } else { 410 sp->fn = consumer_fnp; 411 sp->name = "consumer"; 412 } 413 sp->input = &sp->bufq; 414 sp->output = &stage_info[(i + 1) % stages].bufq; 415 stage_info[i].work_todo = iterations; 416 } 417 418 /* Create the producers */ 419 for (i = 0; i < producers; i++) { 420 work_t *work_array; 421 int *data; 422 int isize; 423 424 isize = pages * 4096 / sizeof(int); 425 data = (int *) malloc(buffers * pages * 4096); 426 427 /* Set up the empty work buffers */ 428 work_array = (work_t *) malloc(buffers * sizeof(work_t)); 429 for (j = 0; j < buffers; j++) { 430 work_array[j].data = data + (isize * j); 431 work_array[j].isize = isize; 432 work_array[j].tag = 0; 433 TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link); 434 DBG(" empty work item %p for data %p\n", 435 &work_array[j], work_array[j].data); 436 } 437 wp = &worker_info[i]; 438 wp->setnum = i + 1; 439 wp->stage = &stage_info[0]; 440 if (ret = pthread_create(&wp->thread, 441 NULL, 442 &manager_fn, 443 (void *) wp)) 444 err(1, "pthread_create %d,%d", 0, i); 445 } 446 447 /* Create consumers */ 448 for (i = 1; i < stages; i++) { 449 for (j = 0; j < consumers; j++) { 450 wp = &worker_info[producers + (consumers*(i-1)) + j]; 451 wp->setnum = j + 1; 452 wp->stage = &stage_info[i]; 453 if (ret = pthread_create(&wp->thread, 454 NULL, 455 &manager_fn, 456 (void *) wp)) 457 err(1, "pthread_create %d,%d", i, j); 458 } 459 } 460 461 /* 462 * We sit back anf wait for the slaves to finish. 463 */ 464 for (k = 0; k < threads; k++) { 465 int i; 466 int j; 467 468 wp = &worker_info[k]; 469 if (k < producers) { 470 i = 0; 471 j = k; 472 } else { 473 i = (k - producers) / consumers; 474 j = (k - producers) % consumers; 475 } 476 if(ret = pthread_join(wp->thread, (void **)&status)) 477 err(1, "pthread_join %d,%d", i, j); 478 DBG("Thread %d,%d status %d\n", i, j, status); 479 } 480 481 /* 482 * See how long the work took. 483 */ 484 timer = mach_absolute_time() - timer; 485 timer = timer / 1000000ULL; 486 printf("%d.%03d seconds elapsed.\n", 487 (int) (timer/1000ULL), (int) (timer % 1000ULL)); 488 489 return 0; 490} 491