1#include <AvailabilityMacros.h>
2#include <mach/thread_policy.h>
3#include <mach/mach.h>
4#include <mach/mach_error.h>
5#include <mach/mach_time.h>
6#include <pthread.h>
7#include <sys/queue.h>
8#include <stdio.h>
9#include <stdlib.h>
10#include <string.h>
11#include <unistd.h>
12#include <err.h>
13
14/*
15 * Pool is another multithreaded test/benchmarking program to evaluate
16 * affinity set placement in Leopard.
17 *
18 * The basic picture is:
19 *
20 *                  -> producer --                 -> consumer --
21 *       free     /                \    work     /                \
22 *    -> queue --      ...          --> queue --                   --
23 *   |            \                /             \                /  |
24 *   |              -> producer --                 -> consumer --    |
25 *    ---------------------------------------------------------------
26 *
27 *       <---------- "stage" ---------> <---------- "stage" --------->
28 *
29 * There are a series of work stages. Each stage has an input and an output
30 * queue and multiple threads. The first stage is the producer and subsequent
31 * stages are consumers. By defuaut there are 2 stages. There are N producer
32 * and M consumer threads. The are B buffers per producer threads circulating
33 * through the system.
34 *
35 * When affinity is enabled, each producer thread is tagged with an affinity tag
36 * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
37 * the work queue it is tagged with this affinity. When a consumer dequeues a
38 * work item, it sets its affinity to this tag. Hence consumer threads migrate
39 * to the same affinity set where the data was produced.
40 *
41 * Buffer management uses pthread mutex/condition variables. A thread blocks
42 * when no buffer is available on a queue and it is signaled when a buffer
43 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
44 * The queue management is centralized in a single routine: what queues to
45 * use as input and output and what function to call for processing is
46 * data-driven.
47 */
48
49pthread_mutex_t funnel;
50pthread_cond_t	barrier;
51
52uint64_t	timer;
53int		threads;
54int		threads_ready = 0;
55
56int		iterations = 10000;
57boolean_t	affinity = FALSE;
58boolean_t	halting = FALSE;
59int		verbosity = 1;
60
61typedef struct work {
62	TAILQ_ENTRY(work)	link;
63	int			*data;
64	int			isize;
65	int			tag;
66	int			number;
67} work_t;
68
69/*
70 * A work queue, complete with pthread objects for its management
71 */
72typedef struct work_queue {
73	pthread_mutex_t		mtx;
74	pthread_cond_t		cnd;
75	TAILQ_HEAD(, work)	queue;
76	unsigned int		waiters;
77} work_queue_t;
78
79/* Worker functions take a integer array and size */
80typedef void (worker_fn_t)(int *, int);
81
82/* This struct controls the function of a stage */
83#define WORKERS_MAX 10
84typedef struct {
85	int			stagenum;
86	char			*name;
87	worker_fn_t		*fn;
88	work_queue_t		*input;
89	work_queue_t		*output;
90	work_queue_t		bufq;
91	int			work_todo;
92} stage_info_t;
93
94/* This defines a worker thread */
95typedef struct worker_info {
96	int			setnum;
97	stage_info_t		*stage;
98	pthread_t		thread;
99} worker_info_t;
100
101#define DBG(x...) do {				\
102	if (verbosity > 1) {			\
103		pthread_mutex_lock(&funnel);	\
104		printf(x);			\
105		pthread_mutex_unlock(&funnel);	\
106	}					\
107} while (0)
108
109#define mutter(x...) do {			\
110	if (verbosity > 0) {			\
111		printf(x);			\
112	}					\
113} while (0)
114
115#define s_if_plural(x)	(((x) > 1) ? "s" : "")
116
117static void
118usage()
119{
120	fprintf(stderr,
121		"usage: pool [-a]    Turn affinity on (off)\n"
122		"            [-b B]  Number of buffers per producer (2)\n"
123		"            [-i I]  Number of buffers to produce (10000)\n"
124		"            [-s S]  Number of stages (2)\n"
125		"            [-p P]  Number of pages per buffer (256=1MB)]\n"
126		"            [-w]    Consumer writes data\n"
127		"            [-v V]  Verbosity level 0..2 (1)\n"
128		"            [N [M]] Number of producer and consumers (2)\n"
129	);
130	exit(1);
131}
132
133/* Trivial producer: write to each byte */
134void
135writer_fn(int *data, int isize)
136{
137	int 	i;
138
139	for (i = 0; i < isize; i++) {
140		data[i] = i;
141	}
142}
143
144/* Trivial consumer: read each byte */
145void
146reader_fn(int *data, int isize)
147{
148	int 	i;
149	int	datum;
150
151	for (i = 0; i < isize; i++) {
152		datum = data[i];
153	}
154}
155
156/* Consumer reading and writing the buffer */
157void
158reader_writer_fn(int *data, int isize)
159{
160	int 	i;
161
162	for (i = 0; i < isize; i++) {
163		data[i] += 1;
164	}
165}
166
167void
168affinity_set(int tag)
169{
170	kern_return_t			ret;
171	thread_affinity_policy_data_t	policy;
172	if (affinity) {
173		policy.affinity_tag = tag;
174		ret = thread_policy_set(
175				mach_thread_self(), THREAD_AFFINITY_POLICY,
176				(thread_policy_t) &policy,
177				THREAD_AFFINITY_POLICY_COUNT);
178		if (ret != KERN_SUCCESS)
179			printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
180	}
181}
182
183/*
184 * This is the central function for every thread.
185 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
186 */
187void *
188manager_fn(void *arg)
189{
190	worker_info_t	*wp = (worker_info_t *) arg;
191	stage_info_t	*sp = wp->stage;
192	boolean_t	is_producer = (sp->stagenum == 0);
193	long		iteration = 0;
194	int		current_tag = 0;
195
196	kern_return_t			ret;
197	thread_extended_policy_data_t	epolicy;
198	epolicy.timeshare = FALSE;
199	ret = thread_policy_set(
200			mach_thread_self(), THREAD_EXTENDED_POLICY,
201			(thread_policy_t) &epolicy,
202			THREAD_EXTENDED_POLICY_COUNT);
203	if (ret != KERN_SUCCESS)
204		printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
205
206	/*
207	 * If we're using affinity sets and we're a producer
208	 * set our tag to by our thread set number.
209	 */
210	if (affinity && is_producer) {
211		affinity_set(wp->setnum);
212		current_tag = wp->setnum;
213	}
214
215	DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum);
216
217	/*
218	 * Start barrier.
219	 * The tets thread to get here releases everyone and starts the timer.
220	 */
221	pthread_mutex_lock(&funnel);
222	threads_ready++;
223	if (threads_ready == threads) {
224		pthread_mutex_unlock(&funnel);
225		if (halting) {
226			printf("  all threads ready for process %d, "
227				"hit any key to start", getpid());
228			fflush(stdout);
229			(void) getchar();
230		}
231		pthread_cond_broadcast(&barrier);
232		timer = mach_absolute_time();
233	} else {
234		pthread_cond_wait(&barrier, &funnel);
235		pthread_mutex_unlock(&funnel);
236	}
237
238	do {
239		work_t		*workp;
240
241		/*
242		 * Get a buffer from the input queue.
243		 * Block if none.
244		 * Quit if all work done.
245		 */
246		pthread_mutex_lock(&sp->input->mtx);
247		while (1) {
248			if (sp->work_todo == 0) {
249				pthread_mutex_unlock(&sp->input->mtx);
250				goto out;
251			}
252			workp = TAILQ_FIRST(&(sp->input->queue));
253			if (workp != NULL)
254				break;
255			DBG("    %s[%d,%d] todo %d waiting for buffer\n",
256				sp->name, wp->setnum, sp->stagenum, sp->work_todo);
257			sp->input->waiters++;
258			pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
259			sp->input->waiters--;
260		}
261		TAILQ_REMOVE(&(sp->input->queue), workp, link);
262		iteration = sp->work_todo--;
263		pthread_mutex_unlock(&sp->input->mtx);
264
265		if (is_producer) {
266			workp->number = iteration;
267			workp->tag = wp->setnum;
268		} else {
269			if (affinity && current_tag != workp->tag) {
270				affinity_set(workp->tag);
271				current_tag = workp->tag;
272			}
273		}
274
275		DBG("  %s[%d,%d] todo %d work %p data %p\n",
276			sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data);
277
278		/* Do our stuff with the buffer */
279		(void) sp->fn(workp->data, workp->isize);
280
281		/*
282		 * Place the buffer on the input queue of the next stage.
283		 * Signal waiters if required.
284		 */
285		pthread_mutex_lock(&sp->output->mtx);
286		TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
287		if (sp->output->waiters) {
288			DBG("    %s[%d,%d] todo %d signaling work\n",
289				sp->name, wp->setnum, sp->stagenum, iteration);
290			pthread_cond_signal(&sp->output->cnd);
291		}
292		pthread_mutex_unlock(&sp->output->mtx);
293
294	} while (1);
295
296out:
297	pthread_cond_broadcast(&sp->output->cnd);
298
299	DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum);
300
301	return (void *) iteration;
302}
303
304void (*producer_fnp)(int *data, int isize) = &writer_fn;
305void (*consumer_fnp)(int *data, int isize) = &reader_fn;
306
307int
308main(int argc, char *argv[])
309{
310	int			i;
311	int			j;
312	int			k;
313	int			pages = 256; /* 1MB */
314	int			buffers = 2;
315	int			producers = 2;
316	int			consumers = 2;
317	int			stages = 2;
318	int			*status;
319	stage_info_t		*stage_info;
320	stage_info_t		*sp;
321	worker_info_t		*worker_info;
322	worker_info_t		*wp;
323	kern_return_t		ret;
324	int			c;
325
326	/* Do switch parsing: */
327	while ((c = getopt (argc, argv, "ab:i:p:s:twv:")) != -1) {
328		switch (c) {
329		case 'a':
330			affinity = !affinity;
331			break;
332		case 'b':
333			buffers = atoi(optarg);
334			break;
335		case 'i':
336			iterations = atoi(optarg);
337			break;
338		case 'p':
339			pages = atoi(optarg);
340			break;
341		case 's':
342			stages = atoi(optarg);
343			if (stages >= WORKERS_MAX)
344				usage();
345			break;
346		case 't':
347			halting = TRUE;
348			break;
349		case 'w':
350			consumer_fnp = &reader_writer_fn;
351			break;
352		case 'v':
353			verbosity = atoi(optarg);
354			break;
355		case 'h':
356		case '?':
357		default:
358			usage();
359		}
360	}
361	argc -= optind; argv += optind;
362	if (argc > 0)
363		producers = atoi(*argv);
364	argc--; argv++;
365	if (argc > 0)
366		consumers = atoi(*argv);
367
368	pthread_mutex_init(&funnel, NULL);
369	pthread_cond_init(&barrier, NULL);
370
371	/*
372 	 * Fire up the worker threads.
373	 */
374	threads = consumers * (stages - 1) + producers;
375	mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
376		"  with %saffinity, consumer reads%s data\n",
377		producers, s_if_plural(producers),
378		stages - 1, s_if_plural(stages - 1),
379		consumers, s_if_plural(consumers),
380		affinity? "": "no ",
381		(consumer_fnp == &reader_writer_fn)? " and writes" : "");
382	if (pages < 256)
383		mutter("  %dkB bytes per buffer, ", pages * 4);
384	else
385		mutter("  %dMB bytes per buffer, ", pages / 256);
386	mutter("%d buffer%s per producer ",
387		buffers, s_if_plural(buffers));
388	if (buffers * pages < 256)
389		mutter("(total %dkB)\n", buffers * pages * 4);
390	else
391		mutter("(total %dMB)\n", buffers * pages / 256);
392	mutter("  processing %d buffer%s...\n",
393		iterations, s_if_plural(iterations));
394
395	stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t));
396	worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t));
397
398	/* Set up the queue for the workers of this thread set: */
399	for (i = 0; i < stages; i++) {
400		sp = &stage_info[i];
401		sp->stagenum = i;
402		pthread_mutex_init(&sp->bufq.mtx, NULL);
403		pthread_cond_init(&sp->bufq.cnd, NULL);
404		TAILQ_INIT(&sp->bufq.queue);
405		sp->bufq.waiters = 0;
406		if (i == 0) {
407			sp->fn = producer_fnp;
408			sp->name = "producer";
409		} else {
410			sp->fn = consumer_fnp;
411			sp->name = "consumer";
412		}
413		sp->input = &sp->bufq;
414		sp->output = &stage_info[(i + 1) % stages].bufq;
415		stage_info[i].work_todo = iterations;
416	}
417
418	/* Create the producers */
419	for (i = 0; i < producers; i++) {
420		work_t	*work_array;
421		int	*data;
422		int	isize;
423
424		isize = pages * 4096 / sizeof(int);
425		data = (int *) malloc(buffers * pages * 4096);
426
427		/* Set up the empty work buffers */
428		work_array = (work_t *)  malloc(buffers * sizeof(work_t));
429		for (j = 0; j < buffers; j++) {
430			work_array[j].data = data + (isize * j);
431			work_array[j].isize = isize;
432			work_array[j].tag = 0;
433			TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link);
434			DBG("  empty work item %p for data %p\n",
435				&work_array[j], work_array[j].data);
436		}
437		wp = &worker_info[i];
438		wp->setnum = i + 1;
439		wp->stage = &stage_info[0];
440		if (ret = pthread_create(&wp->thread,
441					 NULL,
442					 &manager_fn,
443					 (void *) wp))
444			err(1, "pthread_create %d,%d", 0, i);
445	}
446
447	/* Create consumers */
448	for (i = 1; i < stages; i++) {
449		for (j = 0; j < consumers; j++) {
450			wp = &worker_info[producers + (consumers*(i-1)) + j];
451			wp->setnum = j + 1;
452			wp->stage = &stage_info[i];
453			if (ret = pthread_create(&wp->thread,
454						NULL,
455						&manager_fn,
456						(void *) wp))
457				err(1, "pthread_create %d,%d", i, j);
458		}
459	}
460
461	/*
462	 * We sit back anf wait for the slaves to finish.
463	 */
464	for (k = 0; k < threads; k++) {
465		int	i;
466		int	j;
467
468		wp = &worker_info[k];
469		if (k < producers) {
470			i = 0;
471			j = k;
472		} else {
473			i = (k - producers) / consumers;
474			j = (k - producers) % consumers;
475		}
476		if(ret = pthread_join(wp->thread, (void **)&status))
477		    err(1, "pthread_join %d,%d", i, j);
478		DBG("Thread %d,%d status %d\n", i, j, status);
479	}
480
481	/*
482	 * See how long the work took.
483	 */
484	timer = mach_absolute_time() - timer;
485	timer = timer / 1000000ULL;
486	printf("%d.%03d seconds elapsed.\n",
487		(int) (timer/1000ULL), (int) (timer % 1000ULL));
488
489	return 0;
490}
491