1#include <AvailabilityMacros.h> 2#include <mach/thread_policy.h> 3#include <mach/mach.h> 4#include <mach/mach_error.h> 5#include <mach/mach_time.h> 6#include <pthread.h> 7#include <sys/queue.h> 8#include <stdio.h> 9#include <stdlib.h> 10#include <string.h> 11#include <unistd.h> 12#include <err.h> 13#include <errno.h> 14#include <sys/sysctl.h> 15 16/* 17 * Sets is a multithreaded test/benchmarking program to evaluate 18 * affinity set placement in Leopard. 19 * 20 * The picture here, for each set, is: 21 * 22 * free work 23 * -> queue --> producer --> queue --> consumer -- 24 * | | 25 * ----------------------------------------------- 26 * 27 * <------ "stage" -----> <------ "stage" -----> 28 29 * We spin off sets of production line threads (2 sets by default). 30 * All threads of each line sets the same affinity tag (unless disabled). 31 * By default there are 2 stage (worker) threads per production line. 32 * A worker thread removes a buffer from an input queue, processses it and 33 * queues it on an output queue. By default the initial stage (producer) 34 * writes every byte in a buffer and the other (consumer) stages read every 35 * byte. By default the buffers are 1MB (256 pages) in size but this can be 36 * overidden. By default there are 2 buffers per set (again overridable). 37 * Worker threads process (iterate over) 10000 buffers by default. 38 * 39 * With affinity enabled, each producer and consumer thread sets its affinity 40 * to the set number, 1 .. N. So the threads of each set share an L2 cache. 41 * 42 * Buffer management uses pthread mutex/condition variables. A thread blocks 43 * when no buffer is available on a queue and it is signaled when a buffer 44 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>. 45 * The queue management is centralized in a single routine: what queues to 46 * use as input and output and what function to call for processing is 47 * data-driven. 48 */ 49 50pthread_mutex_t funnel; 51pthread_cond_t barrier; 52 53uint64_t timer; 54int threads; 55int threads_ready = 0; 56 57int iterations = 10000; 58boolean_t affinity = FALSE; 59boolean_t halting = FALSE; 60boolean_t cache_config = FALSE; 61int verbosity = 1; 62 63typedef struct work { 64 TAILQ_ENTRY(work) link; 65 int *data; 66} work_t; 67 68/* 69 * A work queue, complete with pthread objects for its management 70 */ 71typedef struct work_queue { 72 pthread_mutex_t mtx; 73 pthread_cond_t cnd; 74 TAILQ_HEAD(, work) queue; 75 boolean_t waiters; 76} work_queue_t; 77 78/* Worker functions take a integer array and size */ 79typedef void (worker_fn_t)(int *, int); 80 81/* This struct controls the function of a thread */ 82typedef struct { 83 int stagenum; 84 char *name; 85 worker_fn_t *fn; 86 work_queue_t *input; 87 work_queue_t *output; 88 struct line_info *set; 89 pthread_t thread; 90 work_queue_t bufq; 91} stage_info_t; 92 93/* This defines a thread set */ 94#define WORKERS_MAX 10 95typedef struct line_info { 96 int setnum; 97 int *data; 98 int isize; 99 stage_info_t *stage[WORKERS_MAX]; 100} line_info_t; 101 102#define DBG(x...) do { \ 103 if (verbosity > 1) { \ 104 pthread_mutex_lock(&funnel); \ 105 printf(x); \ 106 pthread_mutex_unlock(&funnel); \ 107 } \ 108} while (0) 109 110#define mutter(x...) do { \ 111 if (verbosity > 0) { \ 112 printf(x); \ 113 } \ 114} while (0) 115 116#define s_if_plural(x) (((x) > 1) ? "s" : "") 117 118static void 119usage() 120{ 121 fprintf(stderr, 122 "usage: sets [-a] Turn affinity on (off)\n" 123 " [-b B] Number of buffers per set/line (2)\n" 124 " [-c] Configure for max cache performance\n" 125 " [-h] Print this\n" 126 " [-i I] Number of items/buffers to process (1000)\n" 127 " [-s S] Number of stages per set/line (2)\n" 128 " [-t] Halt for keyboard input to start\n" 129 " [-p P] Number of pages per buffer (256=1MB)]\n" 130 " [-w] Consumer writes data\n" 131 " [-v V] Level of verbosity 0..2 (1)\n" 132 " [N] Number of sets/lines (2)\n" 133 ); 134 exit(1); 135} 136 137/* Trivial producer: write to each byte */ 138void 139writer_fn(int *data, int isize) 140{ 141 int i; 142 143 for (i = 0; i < isize; i++) { 144 data[i] = i; 145 } 146} 147 148/* Trivial consumer: read each byte */ 149void 150reader_fn(int *data, int isize) 151{ 152 int i; 153 int datum; 154 155 for (i = 0; i < isize; i++) { 156 datum = data[i]; 157 } 158} 159 160/* Consumer reading and writing the buffer */ 161void 162reader_writer_fn(int *data, int isize) 163{ 164 int i; 165 166 for (i = 0; i < isize; i++) { 167 data[i] += 1; 168 } 169} 170 171/* 172 * This is the central function for every thread. 173 * For each invocation, its role is ets by (a pointer to) a stage_info_t. 174 */ 175void * 176manager_fn(void *arg) 177{ 178 stage_info_t *sp = (stage_info_t *) arg; 179 line_info_t *lp = sp->set; 180 kern_return_t ret; 181 long iteration = 0; 182 183 /* 184 * If we're using affinity sets (we are by default) 185 * set our tag to by our thread set number. 186 */ 187 thread_extended_policy_data_t epolicy; 188 thread_affinity_policy_data_t policy; 189 190 epolicy.timeshare = FALSE; 191 ret = thread_policy_set( 192 mach_thread_self(), THREAD_EXTENDED_POLICY, 193 (thread_policy_t) &epolicy, 194 THREAD_EXTENDED_POLICY_COUNT); 195 if (ret != KERN_SUCCESS) 196 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret); 197 198 if (affinity) { 199 policy.affinity_tag = lp->setnum; 200 ret = thread_policy_set( 201 mach_thread_self(), THREAD_AFFINITY_POLICY, 202 (thread_policy_t) &policy, 203 THREAD_AFFINITY_POLICY_COUNT); 204 if (ret != KERN_SUCCESS) 205 printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret); 206 } 207 208 DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum); 209 210 /* 211 * Start barrier. 212 * The tets thread to get here releases everyone and starts the timer. 213 */ 214 pthread_mutex_lock(&funnel); 215 threads_ready++; 216 if (threads_ready == threads) { 217 pthread_mutex_unlock(&funnel); 218 if (halting) { 219 printf(" all threads ready for process %d, " 220 "hit any key to start", getpid()); 221 fflush(stdout); 222 (void) getchar(); 223 } 224 pthread_cond_broadcast(&barrier); 225 timer = mach_absolute_time(); 226 } else { 227 pthread_cond_wait(&barrier, &funnel); 228 pthread_mutex_unlock(&funnel); 229 } 230 231 do { 232 int i; 233 work_t *workp; 234 235 /* 236 * Get a buffer from the input queue. 237 * Block if none. 238 */ 239 pthread_mutex_lock(&sp->input->mtx); 240 while (1) { 241 workp = TAILQ_FIRST(&(sp->input->queue)); 242 if (workp != NULL) 243 break; 244 DBG(" %s[%d,%d] iteration %d waiting for buffer\n", 245 sp->name, lp->setnum, sp->stagenum, iteration); 246 sp->input->waiters = TRUE; 247 pthread_cond_wait(&sp->input->cnd, &sp->input->mtx); 248 sp->input->waiters = FALSE; 249 } 250 TAILQ_REMOVE(&(sp->input->queue), workp, link); 251 pthread_mutex_unlock(&sp->input->mtx); 252 253 DBG(" %s[%d,%d] iteration %d work %p data %p\n", 254 sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data); 255 256 /* Do our stuff with the buffer */ 257 (void) sp->fn(workp->data, lp->isize); 258 259 /* 260 * Place the buffer on the input queue. 261 * Signal waiters if required. 262 */ 263 pthread_mutex_lock(&sp->output->mtx); 264 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link); 265 if (sp->output->waiters) { 266 DBG(" %s[%d,%d] iteration %d signaling work\n", 267 sp->name, lp->setnum, sp->stagenum, iteration); 268 pthread_cond_signal(&sp->output->cnd); 269 } 270 pthread_mutex_unlock(&sp->output->mtx); 271 } while (++iteration < iterations); 272 273 DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum); 274 275 return (void *) iteration; 276} 277 278#define MAX_CACHE_DEPTH 10 279static void 280auto_config(int npages, int *nbufs, int *nsets) 281{ 282 size_t len; 283 int ncpu; 284 int llc; 285 int64_t cacheconfig[MAX_CACHE_DEPTH]; 286 int64_t cachesize[MAX_CACHE_DEPTH]; 287 288 mutter("Autoconfiguring...\n"); 289 290 len = sizeof(cacheconfig); 291 if (sysctlbyname("hw.cacheconfig", 292 &cacheconfig[0], &len, NULL, 0) != 0) { 293 printf("Unable to get hw.cacheconfig, %d\n", errno); 294 exit(1); 295 } 296 len = sizeof(cachesize); 297 if (sysctlbyname("hw.cachesize", 298 &cachesize[0], &len, NULL, 0) != 0) { 299 printf("Unable to get hw.cachesize, %d\n", errno); 300 exit(1); 301 } 302 303 /* 304 * Find LLC 305 */ 306 for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--) 307 if (cacheconfig[llc] != 0) 308 break; 309 310 /* 311 * Calculate number of buffers of size pages*4096 bytes 312 * fit into 90% of an L2 cache. 313 */ 314 *nbufs = cachesize[llc] * 9 / (npages * 4096 * 10); 315 mutter(" L%d (LLC) cache %qd bytes: " 316 "using %d buffers of size %d bytes\n", 317 llc, cachesize[llc], *nbufs, (npages * 4096)); 318 319 /* 320 * Calcalute how many sets: 321 */ 322 *nsets = cacheconfig[0]/cacheconfig[llc]; 323 mutter(" %qd cpus; %qd cpus per L%d cache: using %d sets\n", 324 cacheconfig[0], cacheconfig[llc], llc, *nsets); 325} 326 327void (*producer_fnp)(int *data, int isize) = &writer_fn; 328void (*consumer_fnp)(int *data, int isize) = &reader_fn; 329 330int 331main(int argc, char *argv[]) 332{ 333 int i; 334 int j; 335 int pages = 256; /* 1MB */ 336 int buffers = 2; 337 int sets = 2; 338 int stages = 2; 339 int *status; 340 line_info_t *line_info; 341 line_info_t *lp; 342 stage_info_t *stage_info; 343 stage_info_t *sp; 344 kern_return_t ret; 345 int c; 346 347 /* Do switch parsing: */ 348 while ((c = getopt (argc, argv, "ab:chi:p:s:twv:")) != -1) { 349 switch (c) { 350 case 'a': 351 affinity = !affinity; 352 break; 353 case 'b': 354 buffers = atoi(optarg); 355 break; 356 case 'c': 357 cache_config = TRUE; 358 break; 359 case 'i': 360 iterations = atoi(optarg); 361 break; 362 case 'p': 363 pages = atoi(optarg); 364 break; 365 case 's': 366 stages = atoi(optarg); 367 if (stages >= WORKERS_MAX) 368 usage(); 369 break; 370 case 't': 371 halting = TRUE; 372 break; 373 case 'w': 374 consumer_fnp = &reader_writer_fn; 375 break; 376 case 'v': 377 verbosity = atoi(optarg); 378 break; 379 case '?': 380 case 'h': 381 default: 382 usage(); 383 } 384 } 385 argc -= optind; argv += optind; 386 if (argc > 0) 387 sets = atoi(*argv); 388 389 if (cache_config) 390 auto_config(pages, &buffers, &sets); 391 392 pthread_mutex_init(&funnel, NULL); 393 pthread_cond_init(&barrier, NULL); 394 395 /* 396 * Fire up the worker threads. 397 */ 398 threads = sets * stages; 399 mutter("Launching %d set%s of %d threads with %saffinity, " 400 "consumer reads%s data\n", 401 sets, s_if_plural(sets), stages, affinity? "": "no ", 402 (consumer_fnp == &reader_writer_fn)? " and writes" : ""); 403 if (pages < 256) 404 mutter(" %dkB bytes per buffer, ", pages * 4); 405 else 406 mutter(" %dMB bytes per buffer, ", pages / 256); 407 mutter("%d buffer%s per set ", 408 buffers, s_if_plural(buffers)); 409 if (buffers * pages < 256) 410 mutter("(total %dkB)\n", buffers * pages * 4); 411 else 412 mutter("(total %dMB)\n", buffers * pages / 256); 413 mutter(" processing %d buffer%s...\n", 414 iterations, s_if_plural(iterations)); 415 line_info = (line_info_t *) malloc(sets * sizeof(line_info_t)); 416 stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t)); 417 for (i = 0; i < sets; i++) { 418 work_t *work_array; 419 420 lp = &line_info[i]; 421 422 lp->setnum = i + 1; 423 lp->isize = pages * 4096 / sizeof(int); 424 lp->data = (int *) malloc(buffers * pages * 4096); 425 426 /* Set up the queue for the workers of this thread set: */ 427 for (j = 0; j < stages; j++) { 428 sp = &stage_info[(i*stages) + j]; 429 sp->stagenum = j; 430 sp->set = lp; 431 lp->stage[j] = sp; 432 pthread_mutex_init(&sp->bufq.mtx, NULL); 433 pthread_cond_init(&sp->bufq.cnd, NULL); 434 TAILQ_INIT(&sp->bufq.queue); 435 sp->bufq.waiters = FALSE; 436 } 437 438 /* 439 * Take a second pass through the stages 440 * to define what the workers are and to interconnect their input/outputs 441 */ 442 for (j = 0; j < stages; j++) { 443 sp = lp->stage[j]; 444 if (j == 0) { 445 sp->fn = producer_fnp; 446 sp->name = "producer"; 447 } else { 448 sp->fn = consumer_fnp; 449 sp->name = "consumer"; 450 } 451 sp->input = &lp->stage[j]->bufq; 452 sp->output = &lp->stage[(j + 1) % stages]->bufq; 453 } 454 455 /* Set up the buffers on the first worker of the set. */ 456 work_array = (work_t *) malloc(buffers * sizeof(work_t)); 457 for (j = 0; j < buffers; j++) { 458 work_array[j].data = lp->data + (lp->isize * j); 459 TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link); 460 DBG(" empty work item %p for set %d data %p\n", 461 &work_array[j], i, work_array[j].data); 462 } 463 464 /* Create this set of threads */ 465 for (j = 0; j < stages; j++) { 466 if (ret = pthread_create(&lp->stage[j]->thread, NULL, 467 &manager_fn, 468 (void *) lp->stage[j])) 469 err(1, "pthread_create %d,%d", i, j); 470 } 471 } 472 473 /* 474 * We sit back anf wait for the slave to finish. 475 */ 476 for (i = 0; i < sets; i++) { 477 lp = &line_info[i]; 478 for (j = 0; j < stages; j++) { 479 if(ret = pthread_join(lp->stage[j]->thread, (void **)&status)) 480 err(1, "pthread_join %d,%d", i, j); 481 DBG("Thread %d,%d status %d\n", i, j, status); 482 } 483 } 484 485 /* 486 * See how long the work took. 487 */ 488 timer = mach_absolute_time() - timer; 489 timer = timer / 1000000ULL; 490 printf("%d.%03d seconds elapsed.\n", 491 (int) (timer/1000ULL), (int) (timer % 1000ULL)); 492 493 return 0; 494} 495