1#include <AvailabilityMacros.h>
2#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
3#include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
4#endif
5#include <mach/mach.h>
6#include <mach/mach_error.h>
7#include <mach/mach_time.h>
8#include <pthread.h>
9#include <sys/queue.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <unistd.h>
14#include <err.h>
15
16/*
17 * Pool is another multithreaded test/benchmarking program to evaluate
18 * affinity set placement in Leopard.
19 *
20 * The basic picture is:
21 *
22 *                  -> producer --                 -> consumer --
23 *       free     /                \    work     /                \
24 *    -> queue --      ...          --> queue --                   --
25 *   |            \                /             \                /  |
26 *   |              -> producer --                 -> consumer --    |
27 *    ---------------------------------------------------------------
28 *
29 *       <---------- "stage" ---------> <---------- "stage" --------->
30 *
31 * There are a series of work stages. Each stage has an input and an output
32 * queue and multiple threads. The first stage is the producer and subsequent
33 * stages are consumers. By defuaut there are 2 stages. There are N producer
34 * and M consumer threads. The are B buffers per producer threads circulating
35 * through the system.
36 *
37 * When affinity is enabled, each producer thread is tagged with an affinity tag
38 * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
39 * the work queue it is tagged with this affinity. When a consumer dequeues a
40 * work item, it sets its affinity to this tag. Hence consumer threads migrate
41 * to the same affinity set where the data was produced.
42 *
43 * Buffer management uses pthread mutex/condition variables. A thread blocks
44 * when no buffer is available on a queue and it is signaled when a buffer
45 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
46 * The queue management is centralized in a single routine: what queues to
47 * use as input and output and what function to call for processing is
48 * data-driven.
49 */
50
51pthread_mutex_t funnel;
52pthread_cond_t	barrier;
53
54uint64_t	timer;
55int		threads;
56int		threads_ready = 0;
57
58int		iterations = 10000;
59boolean_t	affinity = FALSE;
60boolean_t	halting = FALSE;
61int		verbosity = 1;
62
63typedef struct work {
64	TAILQ_ENTRY(work)	link;
65	int			*data;
66	int			isize;
67	int			tag;
68	int			number;
69} work_t;
70
71/*
72 * A work queue, complete with pthread objects for its management
73 */
74typedef struct work_queue {
75	pthread_mutex_t		mtx;
76	pthread_cond_t		cnd;
77	TAILQ_HEAD(, work)	queue;
78	unsigned int		waiters;
79} work_queue_t;
80
81/* Worker functions take a integer array and size */
82typedef void (worker_fn_t)(int *, int);
83
84/* This struct controls the function of a stage */
85#define WORKERS_MAX 10
86typedef struct {
87	int			stagenum;
88	char			*name;
89	worker_fn_t		*fn;
90	work_queue_t		*input;
91	work_queue_t		*output;
92	work_queue_t		bufq;
93	int			work_todo;
94} stage_info_t;
95
96/* This defines a worker thread */
97typedef struct worker_info {
98	int			setnum;
99	stage_info_t		*stage;
100	pthread_t		thread;
101} worker_info_t;
102
103#define DBG(x...) do {				\
104	if (verbosity > 1) {			\
105		pthread_mutex_lock(&funnel);	\
106		printf(x);			\
107		pthread_mutex_unlock(&funnel);	\
108	}					\
109} while (0)
110
111#define mutter(x...) do {			\
112	if (verbosity > 0) {			\
113		printf(x);			\
114	}					\
115} while (0)
116
117#define s_if_plural(x)	(((x) > 1) ? "s" : "")
118
119static void
120usage()
121{
122	fprintf(stderr,
123#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
124		"usage: pool [-a]    Turn affinity on (off)\n"
125		"            [-b B]  Number of buffers per producer (2)\n"
126#else
127		"usage: pool [-b B]  Number of buffers per producer (2)\n"
128#endif
129		"            [-i I]  Number of buffers to produce (10000)\n"
130		"            [-s S]  Number of stages (2)\n"
131		"            [-p P]  Number of pages per buffer (256=1MB)]\n"
132		"            [-w]    Consumer writes data\n"
133		"            [-v V]  Verbosity level 0..2 (1)\n"
134		"            [N [M]] Number of producer and consumers (2)\n"
135	);
136	exit(1);
137}
138
139/* Trivial producer: write to each byte */
140void
141writer_fn(int *data, int isize)
142{
143	int 	i;
144
145	for (i = 0; i < isize; i++) {
146		data[i] = i;
147	}
148}
149
150/* Trivial consumer: read each byte */
151void
152reader_fn(int *data, int isize)
153{
154	int 	i;
155	int	datum;
156
157	for (i = 0; i < isize; i++) {
158		datum = data[i];
159	}
160}
161
162/* Consumer reading and writing the buffer */
163void
164reader_writer_fn(int *data, int isize)
165{
166	int 	i;
167
168	for (i = 0; i < isize; i++) {
169		data[i] += 1;
170	}
171}
172
173void
174affinity_set(int tag)
175{
176#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
177	kern_return_t			ret;
178	thread_affinity_policy_data_t	policy;
179	if (affinity) {
180		policy.affinity_tag = tag;
181		ret = thread_policy_set(
182				mach_thread_self(), THREAD_AFFINITY_POLICY,
183				(thread_policy_t) &policy,
184				THREAD_AFFINITY_POLICY_COUNT);
185		if (ret != KERN_SUCCESS)
186			printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
187	}
188#endif
189}
190
191/*
192 * This is the central function for every thread.
193 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
194 */
195void *
196manager_fn(void *arg)
197{
198	worker_info_t	*wp = (worker_info_t *) arg;
199	stage_info_t	*sp = wp->stage;
200	boolean_t	is_producer = (sp->stagenum == 0);
201	long		iteration = 0;
202	int		current_tag = 0;
203
204#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
205	kern_return_t			ret;
206	thread_extended_policy_data_t	epolicy;
207	epolicy.timeshare = FALSE;
208	ret = thread_policy_set(
209			mach_thread_self(), THREAD_EXTENDED_POLICY,
210			(thread_policy_t) &epolicy,
211			THREAD_EXTENDED_POLICY_COUNT);
212	if (ret != KERN_SUCCESS)
213		printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
214
215#endif
216	/*
217	 * If we're using affinity sets and we're a producer
218	 * set our tag to by our thread set number.
219	 */
220	if (affinity && is_producer) {
221		affinity_set(wp->setnum);
222		current_tag = wp->setnum;
223	}
224
225	DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum);
226
227	/*
228	 * Start barrier.
229	 * The tets thread to get here releases everyone and starts the timer.
230	 */
231	pthread_mutex_lock(&funnel);
232	threads_ready++;
233	if (threads_ready == threads) {
234		pthread_mutex_unlock(&funnel);
235		if (halting) {
236			printf("  all threads ready for process %d, "
237				"hit any key to start", getpid());
238			fflush(stdout);
239			(void) getchar();
240		}
241		pthread_cond_broadcast(&barrier);
242		timer = mach_absolute_time();
243	} else {
244		pthread_cond_wait(&barrier, &funnel);
245		pthread_mutex_unlock(&funnel);
246	}
247
248	do {
249		work_t		*workp;
250
251		/*
252		 * Get a buffer from the input queue.
253		 * Block if none.
254		 * Quit if all work done.
255		 */
256		pthread_mutex_lock(&sp->input->mtx);
257		while (1) {
258			if (sp->work_todo == 0) {
259				pthread_mutex_unlock(&sp->input->mtx);
260				goto out;
261			}
262			workp = TAILQ_FIRST(&(sp->input->queue));
263			if (workp != NULL)
264				break;
265			DBG("    %s[%d,%d] todo %d waiting for buffer\n",
266				sp->name, wp->setnum, sp->stagenum, sp->work_todo);
267			sp->input->waiters++;
268			pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
269			sp->input->waiters--;
270		}
271		TAILQ_REMOVE(&(sp->input->queue), workp, link);
272		iteration = sp->work_todo--;
273		pthread_mutex_unlock(&sp->input->mtx);
274
275		if (is_producer) {
276			workp->number = iteration;
277			workp->tag = wp->setnum;
278		} else {
279			if (affinity && current_tag != workp->tag) {
280				affinity_set(workp->tag);
281				current_tag = workp->tag;
282			}
283		}
284
285		DBG("  %s[%d,%d] todo %d work %p data %p\n",
286			sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data);
287
288		/* Do our stuff with the buffer */
289		(void) sp->fn(workp->data, workp->isize);
290
291		/*
292		 * Place the buffer on the input queue of the next stage.
293		 * Signal waiters if required.
294		 */
295		pthread_mutex_lock(&sp->output->mtx);
296		TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
297		if (sp->output->waiters) {
298			DBG("    %s[%d,%d] todo %d signaling work\n",
299				sp->name, wp->setnum, sp->stagenum, iteration);
300			pthread_cond_signal(&sp->output->cnd);
301		}
302		pthread_mutex_unlock(&sp->output->mtx);
303
304	} while (1);
305
306out:
307	pthread_cond_broadcast(&sp->output->cnd);
308
309	DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum);
310
311	return (void *) iteration;
312}
313
314void (*producer_fnp)(int *data, int isize) = &writer_fn;
315void (*consumer_fnp)(int *data, int isize) = &reader_fn;
316
317int
318main(int argc, char *argv[])
319{
320	int			i;
321	int			j;
322	int			k;
323	int			pages = 256; /* 1MB */
324	int			buffers = 2;
325	int			producers = 2;
326	int			consumers = 2;
327	int			stages = 2;
328	int			*status;
329	stage_info_t		*stage_info;
330	stage_info_t		*sp;
331	worker_info_t		*worker_info;
332	worker_info_t		*wp;
333	kern_return_t		ret;
334	int			c;
335
336	/* Do switch parsing: */
337	while ((c = getopt (argc, argv, "ab:i:p:s:twv:")) != -1) {
338		switch (c) {
339		case 'a':
340#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
341			affinity = !affinity;
342			break;
343#else
344			usage();
345#endif
346		case 'b':
347			buffers = atoi(optarg);
348			break;
349		case 'i':
350			iterations = atoi(optarg);
351			break;
352		case 'p':
353			pages = atoi(optarg);
354			break;
355		case 's':
356			stages = atoi(optarg);
357			if (stages >= WORKERS_MAX)
358				usage();
359			break;
360		case 't':
361			halting = TRUE;
362			break;
363		case 'w':
364			consumer_fnp = &reader_writer_fn;
365			break;
366		case 'v':
367			verbosity = atoi(optarg);
368			break;
369		case 'h':
370		case '?':
371		default:
372			usage();
373		}
374	}
375	argc -= optind; argv += optind;
376	if (argc > 0)
377		producers = atoi(*argv);
378	argc--; argv++;
379	if (argc > 0)
380		consumers = atoi(*argv);
381
382	pthread_mutex_init(&funnel, NULL);
383	pthread_cond_init(&barrier, NULL);
384
385	/*
386 	 * Fire up the worker threads.
387	 */
388	threads = consumers * (stages - 1) + producers;
389	mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
390		"  with %saffinity, consumer reads%s data\n",
391		producers, s_if_plural(producers),
392		stages - 1, s_if_plural(stages - 1),
393		consumers, s_if_plural(consumers),
394		affinity? "": "no ",
395		(consumer_fnp == &reader_writer_fn)? " and writes" : "");
396	if (pages < 256)
397		mutter("  %dkB bytes per buffer, ", pages * 4);
398	else
399		mutter("  %dMB bytes per buffer, ", pages / 256);
400	mutter("%d buffer%s per producer ",
401		buffers, s_if_plural(buffers));
402	if (buffers * pages < 256)
403		mutter("(total %dkB)\n", buffers * pages * 4);
404	else
405		mutter("(total %dMB)\n", buffers * pages / 256);
406	mutter("  processing %d buffer%s...\n",
407		iterations, s_if_plural(iterations));
408
409	stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t));
410	worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t));
411
412	/* Set up the queue for the workers of this thread set: */
413	for (i = 0; i < stages; i++) {
414		sp = &stage_info[i];
415		sp->stagenum = i;
416		pthread_mutex_init(&sp->bufq.mtx, NULL);
417		pthread_cond_init(&sp->bufq.cnd, NULL);
418		TAILQ_INIT(&sp->bufq.queue);
419		sp->bufq.waiters = 0;
420		if (i == 0) {
421			sp->fn = producer_fnp;
422			sp->name = "producer";
423		} else {
424			sp->fn = consumer_fnp;
425			sp->name = "consumer";
426		}
427		sp->input = &sp->bufq;
428		sp->output = &stage_info[(i + 1) % stages].bufq;
429		stage_info[i].work_todo = iterations;
430	}
431
432	/* Create the producers */
433	for (i = 0; i < producers; i++) {
434		work_t	*work_array;
435		int	*data;
436		int	isize;
437
438		isize = pages * 4096 / sizeof(int);
439		data = (int *) malloc(buffers * pages * 4096);
440
441		/* Set up the empty work buffers */
442		work_array = (work_t *)  malloc(buffers * sizeof(work_t));
443		for (j = 0; j < buffers; j++) {
444			work_array[j].data = data + (isize * j);
445			work_array[j].isize = isize;
446			work_array[j].tag = 0;
447			TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link);
448			DBG("  empty work item %p for data %p\n",
449				&work_array[j], work_array[j].data);
450		}
451		wp = &worker_info[i];
452		wp->setnum = i + 1;
453		wp->stage = &stage_info[0];
454		if (ret = pthread_create(&wp->thread,
455					 NULL,
456					 &manager_fn,
457					 (void *) wp))
458			err(1, "pthread_create %d,%d", 0, i);
459	}
460
461	/* Create consumers */
462	for (i = 1; i < stages; i++) {
463		for (j = 0; j < consumers; j++) {
464			wp = &worker_info[producers + (consumers*(i-1)) + j];
465			wp->setnum = j + 1;
466			wp->stage = &stage_info[i];
467			if (ret = pthread_create(&wp->thread,
468						NULL,
469						&manager_fn,
470						(void *) wp))
471				err(1, "pthread_create %d,%d", i, j);
472		}
473	}
474
475	/*
476	 * We sit back anf wait for the slaves to finish.
477	 */
478	for (k = 0; k < threads; k++) {
479		int	i;
480		int	j;
481
482		wp = &worker_info[k];
483		if (k < producers) {
484			i = 0;
485			j = k;
486		} else {
487			i = (k - producers) / consumers;
488			j = (k - producers) % consumers;
489		}
490		if(ret = pthread_join(wp->thread, (void **)&status))
491		    err(1, "pthread_join %d,%d", i, j);
492		DBG("Thread %d,%d status %d\n", i, j, status);
493	}
494
495	/*
496	 * See how long the work took.
497	 */
498	timer = mach_absolute_time() - timer;
499	timer = timer / 1000000ULL;
500	printf("%d.%03d seconds elapsed.\n",
501		(int) (timer/1000ULL), (int) (timer % 1000ULL));
502
503	return 0;
504}
505