1#include <AvailabilityMacros.h> 2#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 3#include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h> 4#endif 5#include <mach/mach.h> 6#include <mach/mach_error.h> 7#include <mach/mach_time.h> 8#include <pthread.h> 9#include <sys/queue.h> 10#include <stdio.h> 11#include <stdlib.h> 12#include <string.h> 13#include <unistd.h> 14#include <err.h> 15#include <errno.h> 16 17/* 18 * Sets is a multithreaded test/benchmarking program to evaluate 19 * affinity set placement in Leopard. 20 * 21 * The picture here, for each set, is: 22 * 23 * free work 24 * -> queue --> producer --> queue --> consumer -- 25 * | | 26 * ----------------------------------------------- 27 * 28 * <------ "stage" -----> <------ "stage" -----> 29 30 * We spin off sets of production line threads (2 sets by default). 31 * All threads of each line sets the same affinity tag (unless disabled). 32 * By default there are 2 stage (worker) threads per production line. 33 * A worker thread removes a buffer from an input queue, processses it and 34 * queues it on an output queue. By default the initial stage (producer) 35 * writes every byte in a buffer and the other (consumer) stages read every 36 * byte. By default the buffers are 1MB (256 pages) in size but this can be 37 * overidden. By default there are 2 buffers per set (again overridable). 38 * Worker threads process (iterate over) 10000 buffers by default. 39 * 40 * With affinity enabled, each producer and consumer thread sets its affinity 41 * to the set number, 1 .. N. So the threads of each set share an L2 cache. 42 * 43 * Buffer management uses pthread mutex/condition variables. A thread blocks 44 * when no buffer is available on a queue and it is signaled when a buffer 45 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>. 46 * The queue management is centralized in a single routine: what queues to 47 * use as input and output and what function to call for processing is 48 * data-driven. 49 */ 50 51pthread_mutex_t funnel; 52pthread_cond_t barrier; 53 54uint64_t timer; 55int threads; 56int threads_ready = 0; 57 58int iterations = 10000; 59boolean_t affinity = FALSE; 60boolean_t halting = FALSE; 61boolean_t cache_config = FALSE; 62int verbosity = 1; 63 64typedef struct work { 65 TAILQ_ENTRY(work) link; 66 int *data; 67} work_t; 68 69/* 70 * A work queue, complete with pthread objects for its management 71 */ 72typedef struct work_queue { 73 pthread_mutex_t mtx; 74 pthread_cond_t cnd; 75 TAILQ_HEAD(, work) queue; 76 boolean_t waiters; 77} work_queue_t; 78 79/* Worker functions take a integer array and size */ 80typedef void (worker_fn_t)(int *, int); 81 82/* This struct controls the function of a thread */ 83typedef struct { 84 int stagenum; 85 char *name; 86 worker_fn_t *fn; 87 work_queue_t *input; 88 work_queue_t *output; 89 struct line_info *set; 90 pthread_t thread; 91 work_queue_t bufq; 92} stage_info_t; 93 94/* This defines a thread set */ 95#define WORKERS_MAX 10 96typedef struct line_info { 97 int setnum; 98 int *data; 99 int isize; 100 stage_info_t *stage[WORKERS_MAX]; 101} line_info_t; 102 103#define DBG(x...) do { \ 104 if (verbosity > 1) { \ 105 pthread_mutex_lock(&funnel); \ 106 printf(x); \ 107 pthread_mutex_unlock(&funnel); \ 108 } \ 109} while (0) 110 111#define mutter(x...) do { \ 112 if (verbosity > 0) { \ 113 printf(x); \ 114 } \ 115} while (0) 116 117#define s_if_plural(x) (((x) > 1) ? "s" : "") 118 119static void 120usage() 121{ 122 fprintf(stderr, 123#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 124 "usage: sets [-a] Turn affinity on (off)\n" 125 " [-b B] Number of buffers per set/line (2)\n" 126#else 127 "usage: sets [-b B] Number of buffers per set/line (2)\n" 128#endif 129 " [-c] Configure for max cache performance\n" 130 " [-h] Print this\n" 131 " [-i I] Number of items/buffers to process (1000)\n" 132 " [-s S] Number of stages per set/line (2)\n" 133 " [-t] Halt for keyboard input to start\n" 134 " [-p P] Number of pages per buffer (256=1MB)]\n" 135 " [-w] Consumer writes data\n" 136 " [-v V] Level of verbosity 0..2 (1)\n" 137 " [N] Number of sets/lines (2)\n" 138 ); 139 exit(1); 140} 141 142/* Trivial producer: write to each byte */ 143void 144writer_fn(int *data, int isize) 145{ 146 int i; 147 148 for (i = 0; i < isize; i++) { 149 data[i] = i; 150 } 151} 152 153/* Trivial consumer: read each byte */ 154void 155reader_fn(int *data, int isize) 156{ 157 int i; 158 int datum; 159 160 for (i = 0; i < isize; i++) { 161 datum = data[i]; 162 } 163} 164 165/* Consumer reading and writing the buffer */ 166void 167reader_writer_fn(int *data, int isize) 168{ 169 int i; 170 171 for (i = 0; i < isize; i++) { 172 data[i] += 1; 173 } 174} 175 176/* 177 * This is the central function for every thread. 178 * For each invocation, its role is ets by (a pointer to) a stage_info_t. 179 */ 180void * 181manager_fn(void *arg) 182{ 183 stage_info_t *sp = (stage_info_t *) arg; 184 line_info_t *lp = sp->set; 185 kern_return_t ret; 186 long iteration = 0; 187 188 /* 189 * If we're using affinity sets (we are by default) 190 * set our tag to by our thread set number. 191 */ 192#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 193 thread_extended_policy_data_t epolicy; 194 thread_affinity_policy_data_t policy; 195 196 epolicy.timeshare = FALSE; 197 ret = thread_policy_set( 198 mach_thread_self(), THREAD_EXTENDED_POLICY, 199 (thread_policy_t) &epolicy, 200 THREAD_EXTENDED_POLICY_COUNT); 201 if (ret != KERN_SUCCESS) 202 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret); 203 204 if (affinity) { 205 policy.affinity_tag = lp->setnum; 206 ret = thread_policy_set( 207 mach_thread_self(), THREAD_AFFINITY_POLICY, 208 (thread_policy_t) &policy, 209 THREAD_AFFINITY_POLICY_COUNT); 210 if (ret != KERN_SUCCESS) 211 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret); 212 } 213#endif 214 215 DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum); 216 217 /* 218 * Start barrier. 219 * The tets thread to get here releases everyone and starts the timer. 220 */ 221 pthread_mutex_lock(&funnel); 222 threads_ready++; 223 if (threads_ready == threads) { 224 pthread_mutex_unlock(&funnel); 225 if (halting) { 226 printf(" all threads ready for process %d, " 227 "hit any key to start", getpid()); 228 fflush(stdout); 229 (void) getchar(); 230 } 231 pthread_cond_broadcast(&barrier); 232 timer = mach_absolute_time(); 233 } else { 234 pthread_cond_wait(&barrier, &funnel); 235 pthread_mutex_unlock(&funnel); 236 } 237 238 do { 239 int i; 240 work_t *workp; 241 242 /* 243 * Get a buffer from the input queue. 244 * Block if none. 245 */ 246 pthread_mutex_lock(&sp->input->mtx); 247 while (1) { 248 workp = TAILQ_FIRST(&(sp->input->queue)); 249 if (workp != NULL) 250 break; 251 DBG(" %s[%d,%d] iteration %d waiting for buffer\n", 252 sp->name, lp->setnum, sp->stagenum, iteration); 253 sp->input->waiters = TRUE; 254 pthread_cond_wait(&sp->input->cnd, &sp->input->mtx); 255 sp->input->waiters = FALSE; 256 } 257 TAILQ_REMOVE(&(sp->input->queue), workp, link); 258 pthread_mutex_unlock(&sp->input->mtx); 259 260 DBG(" %s[%d,%d] iteration %d work %p data %p\n", 261 sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data); 262 263 /* Do our stuff with the buffer */ 264 (void) sp->fn(workp->data, lp->isize); 265 266 /* 267 * Place the buffer on the input queue. 268 * Signal waiters if required. 269 */ 270 pthread_mutex_lock(&sp->output->mtx); 271 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link); 272 if (sp->output->waiters) { 273 DBG(" %s[%d,%d] iteration %d signaling work\n", 274 sp->name, lp->setnum, sp->stagenum, iteration); 275 pthread_cond_signal(&sp->output->cnd); 276 } 277 pthread_mutex_unlock(&sp->output->mtx); 278 } while (++iteration < iterations); 279 280 DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum); 281 282 return (void *) iteration; 283} 284 285#define MAX_CACHE_DEPTH 10 286static void 287auto_config(int npages, int *nbufs, int *nsets) 288{ 289 int len; 290 int ncpu; 291 int llc; 292 int64_t cacheconfig[MAX_CACHE_DEPTH]; 293 int64_t cachesize[MAX_CACHE_DEPTH]; 294 295 mutter("Autoconfiguring...\n"); 296 297 len = sizeof(cacheconfig); 298 if (sysctlbyname("hw.cacheconfig", 299 &cacheconfig[0], &len, NULL, 0) != 0) { 300 printf("Unable to get hw.cacheconfig, %d\n", errno); 301 exit(1); 302 } 303 len = sizeof(cachesize); 304 if (sysctlbyname("hw.cachesize", 305 &cachesize[0], &len, NULL, 0) != 0) { 306 printf("Unable to get hw.cachesize, %d\n", errno); 307 exit(1); 308 } 309 310 /* 311 * Find LLC 312 */ 313 for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--) 314 if (cacheconfig[llc] != 0) 315 break; 316 317 /* 318 * Calculate number of buffers of size pages*4096 bytes 319 * fit into 90% of an L2 cache. 320 */ 321 *nbufs = cachesize[llc] * 9 / (npages * 4096 * 10); 322 mutter(" L%d (LLC) cache %qd bytes: " 323 "using %d buffers of size %d bytes\n", 324 llc, cachesize[llc], *nbufs, (npages * 4096)); 325 326 /* 327 * Calcalute how many sets: 328 */ 329 *nsets = cacheconfig[0]/cacheconfig[llc]; 330 mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n", 331 cacheconfig[0], cacheconfig[llc], llc, *nsets); 332} 333 334void (*producer_fnp)(int *data, int isize) = &writer_fn; 335void (*consumer_fnp)(int *data, int isize) = &reader_fn; 336 337int 338main(int argc, char *argv[]) 339{ 340 int i; 341 int j; 342 int pages = 256; /* 1MB */ 343 int buffers = 2; 344 int sets = 2; 345 int stages = 2; 346 int *status; 347 line_info_t *line_info; 348 line_info_t *lp; 349 stage_info_t *stage_info; 350 stage_info_t *sp; 351 kern_return_t ret; 352 int c; 353 354 /* Do switch parsing: */ 355 while ((c = getopt (argc, argv, "ab:chi:p:s:twv:")) != -1) { 356 switch (c) { 357 case 'a': 358#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER 359 affinity = !affinity; 360 break; 361#else 362 usage(); 363#endif 364 case 'b': 365 buffers = atoi(optarg); 366 break; 367 case 'c': 368 cache_config = TRUE; 369 break; 370 case 'i': 371 iterations = atoi(optarg); 372 break; 373 case 'p': 374 pages = atoi(optarg); 375 break; 376 case 's': 377 stages = atoi(optarg); 378 if (stages >= WORKERS_MAX) 379 usage(); 380 break; 381 case 't': 382 halting = TRUE; 383 break; 384 case 'w': 385 consumer_fnp = &reader_writer_fn; 386 break; 387 case 'v': 388 verbosity = atoi(optarg); 389 break; 390 case '?': 391 case 'h': 392 default: 393 usage(); 394 } 395 } 396 argc -= optind; argv += optind; 397 if (argc > 0) 398 sets = atoi(*argv); 399 400 if (cache_config) 401 auto_config(pages, &buffers, &sets); 402 403 pthread_mutex_init(&funnel, NULL); 404 pthread_cond_init(&barrier, NULL); 405 406 /* 407 * Fire up the worker threads. 408 */ 409 threads = sets * stages; 410 mutter("Launching %d set%s of %d threads with %saffinity, " 411 "consumer reads%s data\n", 412 sets, s_if_plural(sets), stages, affinity? "": "no ", 413 (consumer_fnp == &reader_writer_fn)? " and writes" : ""); 414 if (pages < 256) 415 mutter(" %dkB bytes per buffer, ", pages * 4); 416 else 417 mutter(" %dMB bytes per buffer, ", pages / 256); 418 mutter("%d buffer%s per set ", 419 buffers, s_if_plural(buffers)); 420 if (buffers * pages < 256) 421 mutter("(total %dkB)\n", buffers * pages * 4); 422 else 423 mutter("(total %dMB)\n", buffers * pages / 256); 424 mutter(" processing %d buffer%s...\n", 425 iterations, s_if_plural(iterations)); 426 line_info = (line_info_t *) malloc(sets * sizeof(line_info_t)); 427 stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t)); 428 for (i = 0; i < sets; i++) { 429 work_t *work_array; 430 431 lp = &line_info[i]; 432 433 lp->setnum = i + 1; 434 lp->isize = pages * 4096 / sizeof(int); 435 lp->data = (int *) malloc(buffers * pages * 4096); 436 437 /* Set up the queue for the workers of this thread set: */ 438 for (j = 0; j < stages; j++) { 439 sp = &stage_info[(i*stages) + j]; 440 sp->stagenum = j; 441 sp->set = lp; 442 lp->stage[j] = sp; 443 pthread_mutex_init(&sp->bufq.mtx, NULL); 444 pthread_cond_init(&sp->bufq.cnd, NULL); 445 TAILQ_INIT(&sp->bufq.queue); 446 sp->bufq.waiters = FALSE; 447 } 448 449 /* 450 * Take a second pass through the stages 451 * to define what the workers are and to interconnect their input/outputs 452 */ 453 for (j = 0; j < stages; j++) { 454 sp = lp->stage[j]; 455 if (j == 0) { 456 sp->fn = producer_fnp; 457 sp->name = "producer"; 458 } else { 459 sp->fn = consumer_fnp; 460 sp->name = "consumer"; 461 } 462 sp->input = &lp->stage[j]->bufq; 463 sp->output = &lp->stage[(j + 1) % stages]->bufq; 464 } 465 466 /* Set up the buffers on the first worker of the set. */ 467 work_array = (work_t *) malloc(buffers * sizeof(work_t)); 468 for (j = 0; j < buffers; j++) { 469 work_array[j].data = lp->data + (lp->isize * j); 470 TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link); 471 DBG(" empty work item %p for set %d data %p\n", 472 &work_array[j], i, work_array[j].data); 473 } 474 475 /* Create this set of threads */ 476 for (j = 0; j < stages; j++) { 477 if (ret = pthread_create(&lp->stage[j]->thread, NULL, 478 &manager_fn, 479 (void *) lp->stage[j])) 480 err(1, "pthread_create %d,%d", i, j); 481 } 482 } 483 484 /* 485 * We sit back anf wait for the slave to finish. 486 */ 487 for (i = 0; i < sets; i++) { 488 lp = &line_info[i]; 489 for (j = 0; j < stages; j++) { 490 if(ret = pthread_join(lp->stage[j]->thread, (void **)&status)) 491 err(1, "pthread_join %d,%d", i, j); 492 DBG("Thread %d,%d status %d\n", i, j, status); 493 } 494 } 495 496 /* 497 * See how long the work took. 498 */ 499 timer = mach_absolute_time() - timer; 500 timer = timer / 1000000ULL; 501 printf("%d.%03d seconds elapsed.\n", 502 (int) (timer/1000ULL), (int) (timer % 1000ULL)); 503 504 return 0; 505} 506