1#include <AvailabilityMacros.h> 2#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 3#include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h> 4#endif 5#include <mach/mach.h> 6#include <mach/mach_error.h> 7#include <mach/mach_time.h> 8#include <pthread.h> 9#include <sys/queue.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <unistd.h> 14#include <err.h> 15#include <errno.h> 16#include <sys/sysctl.h> 17 18/* 19 * Sets is a multithreaded test/benchmarking program to evaluate 20 * affinity set placement in Leopard. 21 * 22 * The picture here, for each set, is: 23 * 24 * free work 25 * -> queue --> producer --> queue --> consumer -- 26 * | | 27 * ----------------------------------------------- 28 * 29 * <------ "stage" -----> <------ "stage" -----> 30 31 * We spin off sets of production line threads (2 sets by default). 32 * All threads of each line sets the same affinity tag (unless disabled). 33 * By default there are 2 stage (worker) threads per production line. 34 * A worker thread removes a buffer from an input queue, processses it and 35 * queues it on an output queue. By default the initial stage (producer) 36 * writes every byte in a buffer and the other (consumer) stages read every 37 * byte. By default the buffers are 1MB (256 pages) in size but this can be 38 * overidden. By default there are 2 buffers per set (again overridable). 39 * Worker threads process (iterate over) 10000 buffers by default. 40 * 41 * With affinity enabled, each producer and consumer thread sets its affinity 42 * to the set number, 1 .. N. So the threads of each set share an L2 cache. 43 * 44 * Buffer management uses pthread mutex/condition variables. A thread blocks 45 * when no buffer is available on a queue and it is signaled when a buffer 46 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>. 47 * The queue management is centralized in a single routine: what queues to 48 * use as input and output and what function to call for processing is 49 * data-driven. 50 */ 51 52pthread_mutex_t funnel; 53pthread_cond_t barrier; 54 55uint64_t timer; 56int threads; 57int threads_ready = 0; 58 59int iterations = 10000; 60boolean_t affinity = FALSE; 61boolean_t halting = FALSE; 62boolean_t cache_config = FALSE; 63int verbosity = 1; 64 65typedef struct work { 66 TAILQ_ENTRY(work) link; 67 int *data; 68} work_t; 69 70/* 71 * A work queue, complete with pthread objects for its management 72 */ 73typedef struct work_queue { 74 pthread_mutex_t mtx; 75 pthread_cond_t cnd; 76 TAILQ_HEAD(, work) queue; 77 boolean_t waiters; 78} work_queue_t; 79 80/* Worker functions take a integer array and size */ 81typedef void (worker_fn_t)(int *, int); 82 83/* This struct controls the function of a thread */ 84typedef struct { 85 int stagenum; 86 char *name; 87 worker_fn_t *fn; 88 work_queue_t *input; 89 work_queue_t *output; 90 struct line_info *set; 91 pthread_t thread; 92 work_queue_t bufq; 93} stage_info_t; 94 95/* This defines a thread set */ 96#define WORKERS_MAX 10 97typedef struct line_info { 98 int setnum; 99 int *data; 100 int isize; 101 stage_info_t *stage[WORKERS_MAX]; 102} line_info_t; 103 104#define DBG(x...) do { \ 105 if (verbosity > 1) { \ 106 pthread_mutex_lock(&funnel); \ 107 printf(x); \ 108 pthread_mutex_unlock(&funnel); \ 109 } \ 110} while (0) 111 112#define mutter(x...) do { \ 113 if (verbosity > 0) { \ 114 printf(x); \ 115 } \ 116} while (0) 117 118#define s_if_plural(x) (((x) > 1) ? "s" : "") 119 120static void 121usage() 122{ 123 fprintf(stderr, 124#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 125 "usage: sets [-a] Turn affinity on (off)\n" 126 " [-b B] Number of buffers per set/line (2)\n" 127#else 128 "usage: sets [-b B] Number of buffers per set/line (2)\n" 129#endif 130 " [-c] Configure for max cache performance\n" 131 " [-h] Print this\n" 132 " [-i I] Number of items/buffers to process (1000)\n" 133 " [-s S] Number of stages per set/line (2)\n" 134 " [-t] Halt for keyboard input to start\n" 135 " [-p P] Number of pages per buffer (256=1MB)]\n" 136 " [-w] Consumer writes data\n" 137 " [-v V] Level of verbosity 0..2 (1)\n" 138 " [N] Number of sets/lines (2)\n" 139 ); 140 exit(1); 141} 142 143/* Trivial producer: write to each byte */ 144void 145writer_fn(int *data, int isize) 146{ 147 int i; 148 149 for (i = 0; i < isize; i++) { 150 data[i] = i; 151 } 152} 153 154/* Trivial consumer: read each byte */ 155void 156reader_fn(int *data, int isize) 157{ 158 int i; 159 int datum; 160 161 for (i = 0; i < isize; i++) { 162 datum = data[i]; 163 } 164} 165 166/* Consumer reading and writing the buffer */ 167void 168reader_writer_fn(int *data, int isize) 169{ 170 int i; 171 172 for (i = 0; i < isize; i++) { 173 data[i] += 1; 174 } 175} 176 177/* 178 * This is the central function for every thread. 179 * For each invocation, its role is ets by (a pointer to) a stage_info_t. 180 */ 181void * 182manager_fn(void *arg) 183{ 184 stage_info_t *sp = (stage_info_t *) arg; 185 line_info_t *lp = sp->set; 186 kern_return_t ret; 187 long iteration = 0; 188 189 /* 190 * If we're using affinity sets (we are by default) 191 * set our tag to by our thread set number. 192 */ 193#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 194 thread_extended_policy_data_t epolicy; 195 thread_affinity_policy_data_t policy; 196 197 epolicy.timeshare = FALSE; 198 ret = thread_policy_set( 199 mach_thread_self(), THREAD_EXTENDED_POLICY, 200 (thread_policy_t) &epolicy, 201 THREAD_EXTENDED_POLICY_COUNT); 202 if (ret != KERN_SUCCESS) 203 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret); 204 205 if (affinity) { 206 policy.affinity_tag = lp->setnum; 207 ret = thread_policy_set( 208 mach_thread_self(), THREAD_AFFINITY_POLICY, 209 (thread_policy_t) &policy, 210 THREAD_AFFINITY_POLICY_COUNT); 211 if (ret != KERN_SUCCESS) 212 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret); 213 } 214#endif 215 216 DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum); 217 218 /* 219 * Start barrier. 220 * The tets thread to get here releases everyone and starts the timer. 221 */ 222 pthread_mutex_lock(&funnel); 223 threads_ready++; 224 if (threads_ready == threads) { 225 pthread_mutex_unlock(&funnel); 226 if (halting) { 227 printf(" all threads ready for process %d, " 228 "hit any key to start", getpid()); 229 fflush(stdout); 230 (void) getchar(); 231 } 232 pthread_cond_broadcast(&barrier); 233 timer = mach_absolute_time(); 234 } else { 235 pthread_cond_wait(&barrier, &funnel); 236 pthread_mutex_unlock(&funnel); 237 } 238 239 do { 240 int i; 241 work_t *workp; 242 243 /* 244 * Get a buffer from the input queue. 245 * Block if none. 246 */ 247 pthread_mutex_lock(&sp->input->mtx); 248 while (1) { 249 workp = TAILQ_FIRST(&(sp->input->queue)); 250 if (workp != NULL) 251 break; 252 DBG(" %s[%d,%d] iteration %d waiting for buffer\n", 253 sp->name, lp->setnum, sp->stagenum, iteration); 254 sp->input->waiters = TRUE; 255 pthread_cond_wait(&sp->input->cnd, &sp->input->mtx); 256 sp->input->waiters = FALSE; 257 } 258 TAILQ_REMOVE(&(sp->input->queue), workp, link); 259 pthread_mutex_unlock(&sp->input->mtx); 260 261 DBG(" %s[%d,%d] iteration %d work %p data %p\n", 262 sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data); 263 264 /* Do our stuff with the buffer */ 265 (void) sp->fn(workp->data, lp->isize); 266 267 /* 268 * Place the buffer on the input queue. 269 * Signal waiters if required. 270 */ 271 pthread_mutex_lock(&sp->output->mtx); 272 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link); 273 if (sp->output->waiters) { 274 DBG(" %s[%d,%d] iteration %d signaling work\n", 275 sp->name, lp->setnum, sp->stagenum, iteration); 276 pthread_cond_signal(&sp->output->cnd); 277 } 278 pthread_mutex_unlock(&sp->output->mtx); 279 } while (++iteration < iterations); 280 281 DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum); 282 283 return (void *) iteration; 284} 285 286#define MAX_CACHE_DEPTH 10 287static void 288auto_config(int npages, int *nbufs, int *nsets) 289{ 290 size_t len; 291 int ncpu; 292 int llc; 293 int64_t cacheconfig[MAX_CACHE_DEPTH]; 294 int64_t cachesize[MAX_CACHE_DEPTH]; 295 296 mutter("Autoconfiguring...\n"); 297 298 len = sizeof(cacheconfig); 299 if (sysctlbyname("hw.cacheconfig", 300 &cacheconfig[0], &len, NULL, 0) != 0) { 301 printf("Unable to get hw.cacheconfig, %d\n", errno); 302 exit(1); 303 } 304 len = sizeof(cachesize); 305 if (sysctlbyname("hw.cachesize", 306 &cachesize[0], &len, NULL, 0) != 0) { 307 printf("Unable to get hw.cachesize, %d\n", errno); 308 exit(1); 309 } 310 311 /* 312 * Find LLC 313 */ 314 for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--) 315 if (cacheconfig[llc] != 0) 316 break; 317 318 /* 319 * Calculate number of buffers of size pages*4096 bytes 320 * fit into 90% of an L2 cache. 321 */ 322 *nbufs = cachesize[llc] * 9 / (npages * 4096 * 10); 323 mutter(" L%d (LLC) cache %qd bytes: " 324 "using %d buffers of size %d bytes\n", 325 llc, cachesize[llc], *nbufs, (npages * 4096)); 326 327 /* 328 * Calcalute how many sets: 329 */ 330 *nsets = cacheconfig[0]/cacheconfig[llc]; 331 mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n", 332 cacheconfig[0], cacheconfig[llc], llc, *nsets); 333} 334 335void (*producer_fnp)(int *data, int isize) = &writer_fn; 336void (*consumer_fnp)(int *data, int isize) = &reader_fn; 337 338int 339main(int argc, char *argv[]) 340{ 341 int i; 342 int j; 343 int pages = 256; /* 1MB */ 344 int buffers = 2; 345 int sets = 2; 346 int stages = 2; 347 int *status; 348 line_info_t *line_info; 349 line_info_t *lp; 350 stage_info_t *stage_info; 351 stage_info_t *sp; 352 kern_return_t ret; 353 int c; 354 355 /* Do switch parsing: */ 356 while ((c = getopt (argc, argv, "ab:chi:p:s:twv:")) != -1) { 357 switch (c) { 358 case 'a': 359#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 360 affinity = !affinity; 361 break; 362#else 363 usage(); 364#endif 365 case 'b': 366 buffers = atoi(optarg); 367 break; 368 case 'c': 369 cache_config = TRUE; 370 break; 371 case 'i': 372 iterations = atoi(optarg); 373 break; 374 case 'p': 375 pages = atoi(optarg); 376 break; 377 case 's': 378 stages = atoi(optarg); 379 if (stages >= WORKERS_MAX) 380 usage(); 381 break; 382 case 't': 383 halting = TRUE; 384 break; 385 case 'w': 386 consumer_fnp = &reader_writer_fn; 387 break; 388 case 'v': 389 verbosity = atoi(optarg); 390 break; 391 case '?': 392 case 'h': 393 default: 394 usage(); 395 } 396 } 397 argc -= optind; argv += optind; 398 if (argc > 0) 399 sets = atoi(*argv); 400 401 if (cache_config) 402 auto_config(pages, &buffers, &sets); 403 404 pthread_mutex_init(&funnel, NULL); 405 pthread_cond_init(&barrier, NULL); 406 407 /* 408 * Fire up the worker threads. 409 */ 410 threads = sets * stages; 411 mutter("Launching %d set%s of %d threads with %saffinity, " 412 "consumer reads%s data\n", 413 sets, s_if_plural(sets), stages, affinity? "": "no ", 414 (consumer_fnp == &reader_writer_fn)? " and writes" : ""); 415 if (pages < 256) 416 mutter(" %dkB bytes per buffer, ", pages * 4); 417 else 418 mutter(" %dMB bytes per buffer, ", pages / 256); 419 mutter("%d buffer%s per set ", 420 buffers, s_if_plural(buffers)); 421 if (buffers * pages < 256) 422 mutter("(total %dkB)\n", buffers * pages * 4); 423 else 424 mutter("(total %dMB)\n", buffers * pages / 256); 425 mutter(" processing %d buffer%s...\n", 426 iterations, s_if_plural(iterations)); 427 line_info = (line_info_t *) malloc(sets * sizeof(line_info_t)); 428 stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t)); 429 for (i = 0; i < sets; i++) { 430 work_t *work_array; 431 432 lp = &line_info[i]; 433 434 lp->setnum = i + 1; 435 lp->isize = pages * 4096 / sizeof(int); 436 lp->data = (int *) malloc(buffers * pages * 4096); 437 438 /* Set up the queue for the workers of this thread set: */ 439 for (j = 0; j < stages; j++) { 440 sp = &stage_info[(i*stages) + j]; 441 sp->stagenum = j; 442 sp->set = lp; 443 lp->stage[j] = sp; 444 pthread_mutex_init(&sp->bufq.mtx, NULL); 445 pthread_cond_init(&sp->bufq.cnd, NULL); 446 TAILQ_INIT(&sp->bufq.queue); 447 sp->bufq.waiters = FALSE; 448 } 449 450 /* 451 * Take a second pass through the stages 452 * to define what the workers are and to interconnect their input/outputs 453 */ 454 for (j = 0; j < stages; j++) { 455 sp = lp->stage[j]; 456 if (j == 0) { 457 sp->fn = producer_fnp; 458 sp->name = "producer"; 459 } else { 460 sp->fn = consumer_fnp; 461 sp->name = "consumer"; 462 } 463 sp->input = &lp->stage[j]->bufq; 464 sp->output = &lp->stage[(j + 1) % stages]->bufq; 465 } 466 467 /* Set up the buffers on the first worker of the set. */ 468 work_array = (work_t *) malloc(buffers * sizeof(work_t)); 469 for (j = 0; j < buffers; j++) { 470 work_array[j].data = lp->data + (lp->isize * j); 471 TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link); 472 DBG(" empty work item %p for set %d data %p\n", 473 &work_array[j], i, work_array[j].data); 474 } 475 476 /* Create this set of threads */ 477 for (j = 0; j < stages; j++) { 478 if (ret = pthread_create(&lp->stage[j]->thread, NULL, 479 &manager_fn, 480 (void *) lp->stage[j])) 481 err(1, "pthread_create %d,%d", i, j); 482 } 483 } 484 485 /* 486 * We sit back anf wait for the slave to finish. 487 */ 488 for (i = 0; i < sets; i++) { 489 lp = &line_info[i]; 490 for (j = 0; j < stages; j++) { 491 if(ret = pthread_join(lp->stage[j]->thread, (void **)&status)) 492 err(1, "pthread_join %d,%d", i, j); 493 DBG("Thread %d,%d status %d\n", i, j, status); 494 } 495 } 496 497 /* 498 * See how long the work took. 499 */ 500 timer = mach_absolute_time() - timer; 501 timer = timer / 1000000ULL; 502 printf("%d.%03d seconds elapsed.\n", 503 (int) (timer/1000ULL), (int) (timer % 1000ULL)); 504 505 return 0; 506} 507