1#include <AvailabilityMacros.h>
2#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
3#include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
4#endif
5#include <mach/mach.h>
6#include <mach/mach_error.h>
7#include <mach/mach_time.h>
8#include <pthread.h>
9#include <sys/queue.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <unistd.h>
14#include <err.h>
15#include <errno.h>
16#include <sys/sysctl.h>
17
18/*
19 * Sets is a multithreaded test/benchmarking program to evaluate
20 * affinity set placement in Leopard.
21 *
22 * The picture here, for each set, is:
23 *
24 *       free                   work
25 *    -> queue --> producer --> queue --> consumer --
26 *   |                                               |
27 *    -----------------------------------------------
28 *
29 *       <------ "stage" -----> <------ "stage" ----->
30
31 * We spin off sets of production line threads (2 sets by default).
32 * All threads of each line sets the same affinity tag (unless disabled).
33 * By default there are 2 stage (worker) threads per production line.
34 * A worker thread removes a buffer from an input queue, processses it and
35 * queues it on an output queue.  By default the initial stage (producer)
36 * writes every byte in a buffer and the other (consumer) stages read every
37 * byte. By default the buffers are 1MB (256 pages) in size but this can be
38 * overidden.  By default there are 2 buffers per set (again overridable).
39 * Worker threads process (iterate over) 10000 buffers by default.
40 *
41 * With affinity enabled, each producer and consumer thread sets its affinity
42 * to the set number, 1 .. N. So the threads of each set share an L2 cache.
43 *
44 * Buffer management uses pthread mutex/condition variables. A thread blocks
45 * when no buffer is available on a queue and it is signaled when a buffer
46 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
47 * The queue management is centralized in a single routine: what queues to
48 * use as input and output and what function to call for processing is
49 * data-driven.
50 */
51
52pthread_mutex_t funnel;
53pthread_cond_t	barrier;
54
55uint64_t	timer;
56int		threads;
57int		threads_ready = 0;
58
59int		iterations = 10000;
60boolean_t	affinity = FALSE;
61boolean_t	halting = FALSE;
62boolean_t	cache_config = FALSE;
63int		verbosity = 1;
64
65typedef struct work {
66	TAILQ_ENTRY(work)	link;
67	int			*data;
68} work_t;
69
70/*
71 * A work queue, complete with pthread objects for its management
72 */
73typedef struct work_queue {
74	pthread_mutex_t		mtx;
75	pthread_cond_t		cnd;
76	TAILQ_HEAD(, work)	queue;
77	boolean_t		waiters;
78} work_queue_t;
79
80/* Worker functions take a integer array and size */
81typedef void (worker_fn_t)(int *, int);
82
83/* This struct controls the function of a thread */
84typedef struct {
85	int			stagenum;
86	char			*name;
87	worker_fn_t		*fn;
88	work_queue_t		*input;
89	work_queue_t		*output;
90	struct line_info	*set;
91	pthread_t		thread;
92	work_queue_t		bufq;
93} stage_info_t;
94
95/* This defines a thread set */
96#define WORKERS_MAX 10
97typedef struct line_info {
98	int			setnum;
99	int			*data;
100	int			isize;
101	stage_info_t		*stage[WORKERS_MAX];
102} line_info_t;
103
104#define DBG(x...) do {				\
105	if (verbosity > 1) {			\
106		pthread_mutex_lock(&funnel);	\
107		printf(x);			\
108		pthread_mutex_unlock(&funnel);	\
109	}					\
110} while (0)
111
112#define mutter(x...) do {			\
113	if (verbosity > 0) {			\
114		printf(x);			\
115	}					\
116} while (0)
117
118#define s_if_plural(x)	(((x) > 1) ? "s" : "")
119
120static void
121usage()
122{
123	fprintf(stderr,
124#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
125		"usage: sets [-a]   Turn affinity on (off)\n"
126		"            [-b B] Number of buffers per set/line (2)\n"
127#else
128		"usage: sets [-b B] Number of buffers per set/line (2)\n"
129#endif
130		"            [-c]   Configure for max cache performance\n"
131		"            [-h]   Print this\n"
132		"            [-i I] Number of items/buffers to process (1000)\n"
133		"            [-s S] Number of stages per set/line (2)\n"
134		"            [-t]   Halt for keyboard input to start\n"
135		"            [-p P] Number of pages per buffer (256=1MB)]\n"
136		"            [-w]   Consumer writes data\n"
137		"            [-v V] Level of verbosity 0..2 (1)\n"
138		"            [N]    Number of sets/lines (2)\n"
139	);
140	exit(1);
141}
142
143/* Trivial producer: write to each byte */
144void
145writer_fn(int *data, int isize)
146{
147	int 	i;
148
149	for (i = 0; i < isize; i++) {
150		data[i] = i;
151	}
152}
153
154/* Trivial consumer: read each byte */
155void
156reader_fn(int *data, int isize)
157{
158	int 	i;
159	int	datum;
160
161	for (i = 0; i < isize; i++) {
162		datum = data[i];
163	}
164}
165
166/* Consumer reading and writing the buffer */
167void
168reader_writer_fn(int *data, int isize)
169{
170	int 	i;
171
172	for (i = 0; i < isize; i++) {
173		data[i] += 1;
174	}
175}
176
177/*
178 * This is the central function for every thread.
179 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
180 */
181void *
182manager_fn(void *arg)
183{
184	stage_info_t			*sp = (stage_info_t *) arg;
185	line_info_t			*lp = sp->set;
186	kern_return_t			ret;
187	long				iteration = 0;
188
189	/*
190	 * If we're using affinity sets (we are by default)
191	 * set our tag to by our thread set number.
192	 */
193#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
194	thread_extended_policy_data_t	epolicy;
195	thread_affinity_policy_data_t	policy;
196
197	epolicy.timeshare = FALSE;
198	ret = thread_policy_set(
199			mach_thread_self(), THREAD_EXTENDED_POLICY,
200			(thread_policy_t) &epolicy,
201			THREAD_EXTENDED_POLICY_COUNT);
202	if (ret != KERN_SUCCESS)
203		printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
204
205	if (affinity) {
206		policy.affinity_tag = lp->setnum;
207		ret = thread_policy_set(
208				mach_thread_self(), THREAD_AFFINITY_POLICY,
209				(thread_policy_t) &policy,
210				THREAD_AFFINITY_POLICY_COUNT);
211		if (ret != KERN_SUCCESS)
212			printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
213	}
214#endif
215
216	DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum);
217
218	/*
219	 * Start barrier.
220	 * The tets thread to get here releases everyone and starts the timer.
221	 */
222	pthread_mutex_lock(&funnel);
223	threads_ready++;
224	if (threads_ready == threads) {
225		pthread_mutex_unlock(&funnel);
226		if (halting) {
227			printf("  all threads ready for process %d, "
228				"hit any key to start", getpid());
229			fflush(stdout);
230			(void) getchar();
231		}
232		pthread_cond_broadcast(&barrier);
233		timer = mach_absolute_time();
234	} else {
235		pthread_cond_wait(&barrier, &funnel);
236		pthread_mutex_unlock(&funnel);
237	}
238
239	do {
240		int		i;
241		work_t		*workp;
242
243		/*
244		 * Get a buffer from the input queue.
245		 * Block if none.
246		 */
247		pthread_mutex_lock(&sp->input->mtx);
248		while (1) {
249			workp = TAILQ_FIRST(&(sp->input->queue));
250			if (workp != NULL)
251				break;
252			DBG("    %s[%d,%d] iteration %d waiting for buffer\n",
253				sp->name, lp->setnum, sp->stagenum, iteration);
254			sp->input->waiters = TRUE;
255			pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
256			sp->input->waiters = FALSE;
257		}
258		TAILQ_REMOVE(&(sp->input->queue), workp, link);
259		pthread_mutex_unlock(&sp->input->mtx);
260
261		DBG("  %s[%d,%d] iteration %d work %p data %p\n",
262			sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data);
263
264		/* Do our stuff with the buffer */
265		(void) sp->fn(workp->data, lp->isize);
266
267		/*
268		 * Place the buffer on the input queue.
269		 * Signal  waiters if required.
270		 */
271		pthread_mutex_lock(&sp->output->mtx);
272		TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
273		if (sp->output->waiters) {
274			DBG("    %s[%d,%d] iteration %d signaling work\n",
275				sp->name, lp->setnum, sp->stagenum, iteration);
276			pthread_cond_signal(&sp->output->cnd);
277		}
278		pthread_mutex_unlock(&sp->output->mtx);
279	} while (++iteration < iterations);
280
281	DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum);
282
283	return (void *) iteration;
284}
285
286#define	MAX_CACHE_DEPTH 10
287static void
288auto_config(int npages, int *nbufs, int *nsets)
289{
290	size_t	len;
291	int	ncpu;
292	int	llc;
293	int64_t	cacheconfig[MAX_CACHE_DEPTH];
294	int64_t	cachesize[MAX_CACHE_DEPTH];
295
296	mutter("Autoconfiguring...\n");
297
298	len = sizeof(cacheconfig);
299	if (sysctlbyname("hw.cacheconfig",
300			 &cacheconfig[0], &len, NULL, 0) != 0) {
301		printf("Unable to get hw.cacheconfig, %d\n", errno);
302		exit(1);
303	}
304	len = sizeof(cachesize);
305	if (sysctlbyname("hw.cachesize",
306			 &cachesize[0],  &len, NULL, 0) != 0) {
307		printf("Unable to get hw.cachesize, %d\n", errno);
308		exit(1);
309	}
310
311	/*
312	 * Find LLC
313	 */
314	for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--)
315		if (cacheconfig[llc] != 0)
316			break;
317
318	/*
319	 * Calculate number of buffers of size pages*4096 bytes
320	 * fit into 90% of an L2 cache.
321	 */
322	*nbufs = cachesize[llc] * 9 / (npages * 4096 * 10);
323	mutter("  L%d (LLC) cache %qd bytes: "
324		"using %d buffers of size %d bytes\n",
325		llc, cachesize[llc], *nbufs, (npages * 4096));
326
327	/*
328	 * Calcalute how many sets:
329	 */
330	*nsets = cacheconfig[0]/cacheconfig[llc];
331	mutter("  %qd cpus; %qd cpus per L%d cache: using %d sets\n",
332		cacheconfig[0], cacheconfig[llc], llc, *nsets);
333}
334
335void (*producer_fnp)(int *data, int isize) = &writer_fn;
336void (*consumer_fnp)(int *data, int isize) = &reader_fn;
337
338int
339main(int argc, char *argv[])
340{
341	int			i;
342	int			j;
343	int			pages = 256; /* 1MB */
344	int			buffers = 2;
345	int			sets = 2;
346	int			stages = 2;
347	int			*status;
348	line_info_t		*line_info;
349	line_info_t		*lp;
350	stage_info_t		*stage_info;
351	stage_info_t		*sp;
352	kern_return_t		ret;
353	int			c;
354
355	/* Do switch parsing: */
356	while ((c = getopt (argc, argv, "ab:chi:p:s:twv:")) != -1) {
357		switch (c) {
358		case 'a':
359#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
360			affinity = !affinity;
361			break;
362#else
363			usage();
364#endif
365		case 'b':
366			buffers = atoi(optarg);
367			break;
368		case 'c':
369			cache_config = TRUE;
370			break;
371		case 'i':
372			iterations = atoi(optarg);
373			break;
374		case 'p':
375			pages = atoi(optarg);
376			break;
377		case 's':
378			stages = atoi(optarg);
379			if (stages >= WORKERS_MAX)
380				usage();
381			break;
382		case 't':
383			halting = TRUE;
384			break;
385		case 'w':
386			consumer_fnp = &reader_writer_fn;
387			break;
388		case 'v':
389			verbosity = atoi(optarg);
390			break;
391		case '?':
392		case 'h':
393		default:
394			usage();
395		}
396	}
397	argc -= optind; argv += optind;
398	if (argc > 0)
399		sets = atoi(*argv);
400
401	if (cache_config)
402		auto_config(pages, &buffers, &sets);
403
404	pthread_mutex_init(&funnel, NULL);
405	pthread_cond_init(&barrier, NULL);
406
407	/*
408 	 * Fire up the worker threads.
409	 */
410	threads = sets * stages;
411	mutter("Launching %d set%s of %d threads with %saffinity, "
412			"consumer reads%s data\n",
413		sets, s_if_plural(sets), stages, affinity? "": "no ",
414		(consumer_fnp == &reader_writer_fn)? " and writes" : "");
415	if (pages < 256)
416		mutter("  %dkB bytes per buffer, ", pages * 4);
417	else
418		mutter("  %dMB bytes per buffer, ", pages / 256);
419	mutter("%d buffer%s per set ",
420		buffers, s_if_plural(buffers));
421	if (buffers * pages < 256)
422		mutter("(total %dkB)\n", buffers * pages * 4);
423	else
424		mutter("(total %dMB)\n", buffers * pages / 256);
425	mutter("  processing %d buffer%s...\n",
426		iterations, s_if_plural(iterations));
427	line_info = (line_info_t *) malloc(sets * sizeof(line_info_t));
428	stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t));
429	for (i = 0; i < sets; i++) {
430		work_t	*work_array;
431
432		lp = &line_info[i];
433
434		lp->setnum = i + 1;
435		lp->isize = pages * 4096 / sizeof(int);
436		lp->data = (int *) malloc(buffers * pages * 4096);
437
438		/* Set up the queue for the workers of this thread set: */
439		for (j = 0; j < stages; j++) {
440			sp = &stage_info[(i*stages) + j];
441			sp->stagenum = j;
442			sp->set = lp;
443			lp->stage[j] = sp;
444			pthread_mutex_init(&sp->bufq.mtx, NULL);
445			pthread_cond_init(&sp->bufq.cnd, NULL);
446			TAILQ_INIT(&sp->bufq.queue);
447			sp->bufq.waiters = FALSE;
448		}
449
450		/*
451		 * Take a second pass through the stages
452		 * to define what the workers are and to interconnect their input/outputs
453		 */
454		for (j = 0; j < stages; j++) {
455			sp = lp->stage[j];
456			if (j == 0) {
457				sp->fn = producer_fnp;
458				sp->name = "producer";
459			} else {
460				sp->fn = consumer_fnp;
461				sp->name = "consumer";
462			}
463			sp->input = &lp->stage[j]->bufq;
464			sp->output = &lp->stage[(j + 1) % stages]->bufq;
465		}
466
467		/* Set up the buffers on the first worker of the set. */
468		work_array = (work_t *)  malloc(buffers * sizeof(work_t));
469		for (j = 0; j < buffers; j++) {
470			work_array[j].data = lp->data + (lp->isize * j);
471			TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link);
472			DBG("  empty work item %p for set %d data %p\n",
473				&work_array[j], i, work_array[j].data);
474		}
475
476		/* Create this set of threads */
477		for (j = 0; j < stages; j++) {
478			if (ret = pthread_create(&lp->stage[j]->thread, NULL,
479					&manager_fn,
480					(void *) lp->stage[j]))
481			err(1, "pthread_create %d,%d", i, j);
482		}
483	}
484
485	/*
486	 * We sit back anf wait for the slave to finish.
487	 */
488	for (i = 0; i < sets; i++) {
489		lp = &line_info[i];
490		for (j = 0; j < stages; j++) {
491			if(ret = pthread_join(lp->stage[j]->thread, (void **)&status))
492			    err(1, "pthread_join %d,%d", i, j);
493			DBG("Thread %d,%d status %d\n", i, j, status);
494		}
495	}
496
497	/*
498	 * See how long the work took.
499	 */
500	timer = mach_absolute_time() - timer;
501	timer = timer / 1000000ULL;
502	printf("%d.%03d seconds elapsed.\n",
503		(int) (timer/1000ULL), (int) (timer % 1000ULL));
504
505	return 0;
506}
507