#include #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER #include #endif #include #include #include #include #include #include #include #include #include #include /* * Pool is another multithreaded test/benchmarking program to evaluate * affinity set placement in Leopard. * * The basic picture is: * * -> producer -- -> consumer -- * free / \ work / \ * -> queue -- ... --> queue -- -- * | \ / \ / | * | -> producer -- -> consumer -- | * --------------------------------------------------------------- * * <---------- "stage" ---------> <---------- "stage" ---------> * * There are a series of work stages. Each stage has an input and an output * queue and multiple threads. The first stage is the producer and subsequent * stages are consumers. By defuaut there are 2 stages. There are N producer * and M consumer threads. The are B buffers per producer threads circulating * through the system. * * When affinity is enabled, each producer thread is tagged with an affinity tag * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to * the work queue it is tagged with this affinity. When a consumer dequeues a * work item, it sets its affinity to this tag. Hence consumer threads migrate * to the same affinity set where the data was produced. * * Buffer management uses pthread mutex/condition variables. A thread blocks * when no buffer is available on a queue and it is signaled when a buffer * is placed on an empty queue. Queues are tailq'a a la . * The queue management is centralized in a single routine: what queues to * use as input and output and what function to call for processing is * data-driven. */ pthread_mutex_t funnel; pthread_cond_t barrier; uint64_t timer; int threads; int threads_ready = 0; int iterations = 10000; boolean_t affinity = FALSE; boolean_t halting = FALSE; int verbosity = 1; typedef struct work { TAILQ_ENTRY(work) link; int *data; int isize; int tag; int number; } work_t; /* * A work queue, complete with pthread objects for its management */ typedef struct work_queue { pthread_mutex_t mtx; pthread_cond_t cnd; TAILQ_HEAD(, work) queue; unsigned int waiters; } work_queue_t; /* Worker functions take a integer array and size */ typedef void (worker_fn_t)(int *, int); /* This struct controls the function of a stage */ #define WORKERS_MAX 10 typedef struct { int stagenum; char *name; worker_fn_t *fn; work_queue_t *input; work_queue_t *output; work_queue_t bufq; int work_todo; } stage_info_t; /* This defines a worker thread */ typedef struct worker_info { int setnum; stage_info_t *stage; pthread_t thread; } worker_info_t; #define DBG(x...) do { \ if (verbosity > 1) { \ pthread_mutex_lock(&funnel); \ printf(x); \ pthread_mutex_unlock(&funnel); \ } \ } while (0) #define mutter(x...) do { \ if (verbosity > 0) { \ printf(x); \ } \ } while (0) #define s_if_plural(x) (((x) > 1) ? "s" : "") static void usage() { fprintf(stderr, #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER "usage: pool [-a] Turn affinity on (off)\n" " [-b B] Number of buffers per producer (2)\n" #else "usage: pool [-b B] Number of buffers per producer (2)\n" #endif " [-i I] Number of buffers to produce (10000)\n" " [-s S] Number of stages (2)\n" " [-p P] Number of pages per buffer (256=1MB)]\n" " [-w] Consumer writes data\n" " [-v V] Verbosity level 0..2 (1)\n" " [N [M]] Number of producer and consumers (2)\n" ); exit(1); } /* Trivial producer: write to each byte */ void writer_fn(int *data, int isize) { int i; for (i = 0; i < isize; i++) { data[i] = i; } } /* Trivial consumer: read each byte */ void reader_fn(int *data, int isize) { int i; int datum; for (i = 0; i < isize; i++) { datum = data[i]; } } /* Consumer reading and writing the buffer */ void reader_writer_fn(int *data, int isize) { int i; for (i = 0; i < isize; i++) { data[i] += 1; } } void affinity_set(int tag) { #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER kern_return_t ret; thread_affinity_policy_data_t policy; if (affinity) { policy.affinity_tag = tag; ret = thread_policy_set( mach_thread_self(), THREAD_AFFINITY_POLICY, (thread_policy_t) &policy, THREAD_AFFINITY_POLICY_COUNT); if (ret != KERN_SUCCESS) printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret); } #endif } /* * This is the central function for every thread. * For each invocation, its role is ets by (a pointer to) a stage_info_t. */ void * manager_fn(void *arg) { worker_info_t *wp = (worker_info_t *) arg; stage_info_t *sp = wp->stage; boolean_t is_producer = (sp->stagenum == 0); long iteration = 0; int current_tag = 0; #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER kern_return_t ret; thread_extended_policy_data_t epolicy; epolicy.timeshare = FALSE; ret = thread_policy_set( mach_thread_self(), THREAD_EXTENDED_POLICY, (thread_policy_t) &epolicy, THREAD_EXTENDED_POLICY_COUNT); if (ret != KERN_SUCCESS) printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret); #endif /* * If we're using affinity sets and we're a producer * set our tag to by our thread set number. */ if (affinity && is_producer) { affinity_set(wp->setnum); current_tag = wp->setnum; } DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum); /* * Start barrier. * The tets thread to get here releases everyone and starts the timer. */ pthread_mutex_lock(&funnel); threads_ready++; if (threads_ready == threads) { pthread_mutex_unlock(&funnel); if (halting) { printf(" all threads ready for process %d, " "hit any key to start", getpid()); fflush(stdout); (void) getchar(); } pthread_cond_broadcast(&barrier); timer = mach_absolute_time(); } else { pthread_cond_wait(&barrier, &funnel); pthread_mutex_unlock(&funnel); } do { work_t *workp; /* * Get a buffer from the input queue. * Block if none. * Quit if all work done. */ pthread_mutex_lock(&sp->input->mtx); while (1) { if (sp->work_todo == 0) { pthread_mutex_unlock(&sp->input->mtx); goto out; } workp = TAILQ_FIRST(&(sp->input->queue)); if (workp != NULL) break; DBG(" %s[%d,%d] todo %d waiting for buffer\n", sp->name, wp->setnum, sp->stagenum, sp->work_todo); sp->input->waiters++; pthread_cond_wait(&sp->input->cnd, &sp->input->mtx); sp->input->waiters--; } TAILQ_REMOVE(&(sp->input->queue), workp, link); iteration = sp->work_todo--; pthread_mutex_unlock(&sp->input->mtx); if (is_producer) { workp->number = iteration; workp->tag = wp->setnum; } else { if (affinity && current_tag != workp->tag) { affinity_set(workp->tag); current_tag = workp->tag; } } DBG(" %s[%d,%d] todo %d work %p data %p\n", sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data); /* Do our stuff with the buffer */ (void) sp->fn(workp->data, workp->isize); /* * Place the buffer on the input queue of the next stage. * Signal waiters if required. */ pthread_mutex_lock(&sp->output->mtx); TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link); if (sp->output->waiters) { DBG(" %s[%d,%d] todo %d signaling work\n", sp->name, wp->setnum, sp->stagenum, iteration); pthread_cond_signal(&sp->output->cnd); } pthread_mutex_unlock(&sp->output->mtx); } while (1); out: pthread_cond_broadcast(&sp->output->cnd); DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum); return (void *) iteration; } void (*producer_fnp)(int *data, int isize) = &writer_fn; void (*consumer_fnp)(int *data, int isize) = &reader_fn; int main(int argc, char *argv[]) { int i; int j; int k; int pages = 256; /* 1MB */ int buffers = 2; int producers = 2; int consumers = 2; int stages = 2; int *status; stage_info_t *stage_info; stage_info_t *sp; worker_info_t *worker_info; worker_info_t *wp; kern_return_t ret; int c; /* Do switch parsing: */ while ((c = getopt (argc, argv, "ab:i:p:s:twv:")) != -1) { switch (c) { case 'a': #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER affinity = !affinity; break; #else usage(); #endif case 'b': buffers = atoi(optarg); break; case 'i': iterations = atoi(optarg); break; case 'p': pages = atoi(optarg); break; case 's': stages = atoi(optarg); if (stages >= WORKERS_MAX) usage(); break; case 't': halting = TRUE; break; case 'w': consumer_fnp = &reader_writer_fn; break; case 'v': verbosity = atoi(optarg); break; case 'h': case '?': default: usage(); } } argc -= optind; argv += optind; if (argc > 0) producers = atoi(*argv); argc--; argv++; if (argc > 0) consumers = atoi(*argv); pthread_mutex_init(&funnel, NULL); pthread_cond_init(&barrier, NULL); /* * Fire up the worker threads. */ threads = consumers * (stages - 1) + producers; mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n" " with %saffinity, consumer reads%s data\n", producers, s_if_plural(producers), stages - 1, s_if_plural(stages - 1), consumers, s_if_plural(consumers), affinity? "": "no ", (consumer_fnp == &reader_writer_fn)? " and writes" : ""); if (pages < 256) mutter(" %dkB bytes per buffer, ", pages * 4); else mutter(" %dMB bytes per buffer, ", pages / 256); mutter("%d buffer%s per producer ", buffers, s_if_plural(buffers)); if (buffers * pages < 256) mutter("(total %dkB)\n", buffers * pages * 4); else mutter("(total %dMB)\n", buffers * pages / 256); mutter(" processing %d buffer%s...\n", iterations, s_if_plural(iterations)); stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t)); worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t)); /* Set up the queue for the workers of this thread set: */ for (i = 0; i < stages; i++) { sp = &stage_info[i]; sp->stagenum = i; pthread_mutex_init(&sp->bufq.mtx, NULL); pthread_cond_init(&sp->bufq.cnd, NULL); TAILQ_INIT(&sp->bufq.queue); sp->bufq.waiters = 0; if (i == 0) { sp->fn = producer_fnp; sp->name = "producer"; } else { sp->fn = consumer_fnp; sp->name = "consumer"; } sp->input = &sp->bufq; sp->output = &stage_info[(i + 1) % stages].bufq; stage_info[i].work_todo = iterations; } /* Create the producers */ for (i = 0; i < producers; i++) { work_t *work_array; int *data; int isize; isize = pages * 4096 / sizeof(int); data = (int *) malloc(buffers * pages * 4096); /* Set up the empty work buffers */ work_array = (work_t *) malloc(buffers * sizeof(work_t)); for (j = 0; j < buffers; j++) { work_array[j].data = data + (isize * j); work_array[j].isize = isize; work_array[j].tag = 0; TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link); DBG(" empty work item %p for data %p\n", &work_array[j], work_array[j].data); } wp = &worker_info[i]; wp->setnum = i + 1; wp->stage = &stage_info[0]; if (ret = pthread_create(&wp->thread, NULL, &manager_fn, (void *) wp)) err(1, "pthread_create %d,%d", 0, i); } /* Create consumers */ for (i = 1; i < stages; i++) { for (j = 0; j < consumers; j++) { wp = &worker_info[producers + (consumers*(i-1)) + j]; wp->setnum = j + 1; wp->stage = &stage_info[i]; if (ret = pthread_create(&wp->thread, NULL, &manager_fn, (void *) wp)) err(1, "pthread_create %d,%d", i, j); } } /* * We sit back anf wait for the slaves to finish. */ for (k = 0; k < threads; k++) { int i; int j; wp = &worker_info[k]; if (k < producers) { i = 0; j = k; } else { i = (k - producers) / consumers; j = (k - producers) % consumers; } if(ret = pthread_join(wp->thread, (void **)&status)) err(1, "pthread_join %d,%d", i, j); DBG("Thread %d,%d status %d\n", i, j, status); } /* * See how long the work took. */ timer = mach_absolute_time() - timer; timer = timer / 1000000ULL; printf("%d.%03d seconds elapsed.\n", (int) (timer/1000ULL), (int) (timer % 1000ULL)); return 0; }