1#include <AvailabilityMacros.h>
2#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
3#include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
4#endif
5#include <mach/mach.h>
6#include <mach/mach_error.h>
7#include <mach/mach_time.h>
8#include <pthread.h>
9#include <sys/queue.h>
10#include <stdio.h>
11#include <stdlib.h>
12#include <string.h>
13#include <unistd.h>
14#include <err.h>
15#include <errno.h>
16
17/*
18 * Sets is a multithreaded test/benchmarking program to evaluate
19 * affinity set placement in Leopard.
20 *
21 * The picture here, for each set, is:
22 *
23 *       free                   work
24 *    -> queue --> producer --> queue --> consumer --
25 *   |                                               |
26 *    -----------------------------------------------
27 *
28 *       <------ "stage" -----> <------ "stage" ----->
29
30 * We spin off sets of production line threads (2 sets by default).
31 * All threads of each line sets the same affinity tag (unless disabled).
32 * By default there are 2 stage (worker) threads per production line.
33 * A worker thread removes a buffer from an input queue, processses it and
34 * queues it on an output queue.  By default the initial stage (producer)
35 * writes every byte in a buffer and the other (consumer) stages read every
36 * byte. By default the buffers are 1MB (256 pages) in size but this can be
37 * overidden.  By default there are 2 buffers per set (again overridable).
38 * Worker threads process (iterate over) 10000 buffers by default.
39 *
40 * With affinity enabled, each producer and consumer thread sets its affinity
41 * to the set number, 1 .. N. So the threads of each set share an L2 cache.
42 *
43 * Buffer management uses pthread mutex/condition variables. A thread blocks
44 * when no buffer is available on a queue and it is signaled when a buffer
45 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
46 * The queue management is centralized in a single routine: what queues to
47 * use as input and output and what function to call for processing is
48 * data-driven.
49 */
50
51pthread_mutex_t funnel;
52pthread_cond_t	barrier;
53
54uint64_t	timer;
55int		threads;
56int		threads_ready = 0;
57
58int		iterations = 10000;
59boolean_t	affinity = FALSE;
60boolean_t	halting = FALSE;
61boolean_t	cache_config = FALSE;
62int		verbosity = 1;
63
64typedef struct work {
65	TAILQ_ENTRY(work)	link;
66	int			*data;
67} work_t;
68
69/*
70 * A work queue, complete with pthread objects for its management
71 */
72typedef struct work_queue {
73	pthread_mutex_t		mtx;
74	pthread_cond_t		cnd;
75	TAILQ_HEAD(, work)	queue;
76	boolean_t		waiters;
77} work_queue_t;
78
79/* Worker functions take a integer array and size */
80typedef void (worker_fn_t)(int *, int);
81
82/* This struct controls the function of a thread */
83typedef struct {
84	int			stagenum;
85	char			*name;
86	worker_fn_t		*fn;
87	work_queue_t		*input;
88	work_queue_t		*output;
89	struct line_info	*set;
90	pthread_t		thread;
91	work_queue_t		bufq;
92} stage_info_t;
93
94/* This defines a thread set */
95#define WORKERS_MAX 10
96typedef struct line_info {
97	int			setnum;
98	int			*data;
99	int			isize;
100	stage_info_t		*stage[WORKERS_MAX];
101} line_info_t;
102
103#define DBG(x...) do {				\
104	if (verbosity > 1) {			\
105		pthread_mutex_lock(&funnel);	\
106		printf(x);			\
107		pthread_mutex_unlock(&funnel);	\
108	}					\
109} while (0)
110
111#define mutter(x...) do {			\
112	if (verbosity > 0) {			\
113		printf(x);			\
114	}					\
115} while (0)
116
117#define s_if_plural(x)	(((x) > 1) ? "s" : "")
118
119static void
120usage()
121{
122	fprintf(stderr,
123#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
124		"usage: sets [-a]   Turn affinity on (off)\n"
125		"            [-b B] Number of buffers per set/line (2)\n"
126#else
127		"usage: sets [-b B] Number of buffers per set/line (2)\n"
128#endif
129		"            [-c]   Configure for max cache performance\n"
130		"            [-h]   Print this\n"
131		"            [-i I] Number of items/buffers to process (1000)\n"
132		"            [-s S] Number of stages per set/line (2)\n"
133		"            [-t]   Halt for keyboard input to start\n"
134		"            [-p P] Number of pages per buffer (256=1MB)]\n"
135		"            [-w]   Consumer writes data\n"
136		"            [-v V] Level of verbosity 0..2 (1)\n"
137		"            [N]    Number of sets/lines (2)\n"
138	);
139	exit(1);
140}
141
142/* Trivial producer: write to each byte */
143void
144writer_fn(int *data, int isize)
145{
146	int 	i;
147
148	for (i = 0; i < isize; i++) {
149		data[i] = i;
150	}
151}
152
153/* Trivial consumer: read each byte */
154void
155reader_fn(int *data, int isize)
156{
157	int 	i;
158	int	datum;
159
160	for (i = 0; i < isize; i++) {
161		datum = data[i];
162	}
163}
164
165/* Consumer reading and writing the buffer */
166void
167reader_writer_fn(int *data, int isize)
168{
169	int 	i;
170
171	for (i = 0; i < isize; i++) {
172		data[i] += 1;
173	}
174}
175
176/*
177 * This is the central function for every thread.
178 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
179 */
180void *
181manager_fn(void *arg)
182{
183	stage_info_t			*sp = (stage_info_t *) arg;
184	line_info_t			*lp = sp->set;
185	kern_return_t			ret;
186	long				iteration = 0;
187
188	/*
189	 * If we're using affinity sets (we are by default)
190	 * set our tag to by our thread set number.
191	 */
192#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
193	thread_extended_policy_data_t	epolicy;
194	thread_affinity_policy_data_t	policy;
195
196	epolicy.timeshare = FALSE;
197	ret = thread_policy_set(
198			mach_thread_self(), THREAD_EXTENDED_POLICY,
199			(thread_policy_t) &epolicy,
200			THREAD_EXTENDED_POLICY_COUNT);
201	if (ret != KERN_SUCCESS)
202		printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
203
204	if (affinity) {
205		policy.affinity_tag = lp->setnum;
206		ret = thread_policy_set(
207				mach_thread_self(), THREAD_AFFINITY_POLICY,
208				(thread_policy_t) &policy,
209				THREAD_AFFINITY_POLICY_COUNT);
210		if (ret != KERN_SUCCESS)
211			printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
212	}
213#endif
214
215	DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum);
216
217	/*
218	 * Start barrier.
219	 * The tets thread to get here releases everyone and starts the timer.
220	 */
221	pthread_mutex_lock(&funnel);
222	threads_ready++;
223	if (threads_ready == threads) {
224		pthread_mutex_unlock(&funnel);
225		if (halting) {
226			printf("  all threads ready for process %d, "
227				"hit any key to start", getpid());
228			fflush(stdout);
229			(void) getchar();
230		}
231		pthread_cond_broadcast(&barrier);
232		timer = mach_absolute_time();
233	} else {
234		pthread_cond_wait(&barrier, &funnel);
235		pthread_mutex_unlock(&funnel);
236	}
237
238	do {
239		int		i;
240		work_t		*workp;
241
242		/*
243		 * Get a buffer from the input queue.
244		 * Block if none.
245		 */
246		pthread_mutex_lock(&sp->input->mtx);
247		while (1) {
248			workp = TAILQ_FIRST(&(sp->input->queue));
249			if (workp != NULL)
250				break;
251			DBG("    %s[%d,%d] iteration %d waiting for buffer\n",
252				sp->name, lp->setnum, sp->stagenum, iteration);
253			sp->input->waiters = TRUE;
254			pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
255			sp->input->waiters = FALSE;
256		}
257		TAILQ_REMOVE(&(sp->input->queue), workp, link);
258		pthread_mutex_unlock(&sp->input->mtx);
259
260		DBG("  %s[%d,%d] iteration %d work %p data %p\n",
261			sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data);
262
263		/* Do our stuff with the buffer */
264		(void) sp->fn(workp->data, lp->isize);
265
266		/*
267		 * Place the buffer on the input queue.
268		 * Signal  waiters if required.
269		 */
270		pthread_mutex_lock(&sp->output->mtx);
271		TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
272		if (sp->output->waiters) {
273			DBG("    %s[%d,%d] iteration %d signaling work\n",
274				sp->name, lp->setnum, sp->stagenum, iteration);
275			pthread_cond_signal(&sp->output->cnd);
276		}
277		pthread_mutex_unlock(&sp->output->mtx);
278	} while (++iteration < iterations);
279
280	DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum);
281
282	return (void *) iteration;
283}
284
285#define	MAX_CACHE_DEPTH 10
286static void
287auto_config(int npages, int *nbufs, int *nsets)
288{
289	int	len;
290	int	ncpu;
291	int	llc;
292	int64_t	cacheconfig[MAX_CACHE_DEPTH];
293	int64_t	cachesize[MAX_CACHE_DEPTH];
294
295	mutter("Autoconfiguring...\n");
296
297	len = sizeof(cacheconfig);
298	if (sysctlbyname("hw.cacheconfig",
299			 &cacheconfig[0], &len, NULL, 0) != 0) {
300		printf("Unable to get hw.cacheconfig, %d\n", errno);
301		exit(1);
302	}
303	len = sizeof(cachesize);
304	if (sysctlbyname("hw.cachesize",
305			 &cachesize[0],  &len, NULL, 0) != 0) {
306		printf("Unable to get hw.cachesize, %d\n", errno);
307		exit(1);
308	}
309
310	/*
311	 * Find LLC
312	 */
313	for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--)
314		if (cacheconfig[llc] != 0)
315			break;
316
317	/*
318	 * Calculate number of buffers of size pages*4096 bytes
319	 * fit into 90% of an L2 cache.
320	 */
321	*nbufs = cachesize[llc] * 9 / (npages * 4096 * 10);
322	mutter("  L%d (LLC) cache %qd bytes: "
323		"using %d buffers of size %d bytes\n",
324		llc, cachesize[llc], *nbufs, (npages * 4096));
325
326	/*
327	 * Calcalute how many sets:
328	 */
329	*nsets = cacheconfig[0]/cacheconfig[llc];
330	mutter("  %qd cpus; %qd cpus per L%d cache: using %d sets\n",
331		cacheconfig[0], cacheconfig[llc], llc, *nsets);
332}
333
334void (*producer_fnp)(int *data, int isize) = &writer_fn;
335void (*consumer_fnp)(int *data, int isize) = &reader_fn;
336
337int
338main(int argc, char *argv[])
339{
340	int			i;
341	int			j;
342	int			pages = 256; /* 1MB */
343	int			buffers = 2;
344	int			sets = 2;
345	int			stages = 2;
346	int			*status;
347	line_info_t		*line_info;
348	line_info_t		*lp;
349	stage_info_t		*stage_info;
350	stage_info_t		*sp;
351	kern_return_t		ret;
352	int			c;
353
354	/* Do switch parsing: */
355	while ((c = getopt (argc, argv, "ab:chi:p:s:twv:")) != -1) {
356		switch (c) {
357		case 'a':
358#ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
359			affinity = !affinity;
360			break;
361#else
362			usage();
363#endif
364		case 'b':
365			buffers = atoi(optarg);
366			break;
367		case 'c':
368			cache_config = TRUE;
369			break;
370		case 'i':
371			iterations = atoi(optarg);
372			break;
373		case 'p':
374			pages = atoi(optarg);
375			break;
376		case 's':
377			stages = atoi(optarg);
378			if (stages >= WORKERS_MAX)
379				usage();
380			break;
381		case 't':
382			halting = TRUE;
383			break;
384		case 'w':
385			consumer_fnp = &reader_writer_fn;
386			break;
387		case 'v':
388			verbosity = atoi(optarg);
389			break;
390		case '?':
391		case 'h':
392		default:
393			usage();
394		}
395	}
396	argc -= optind; argv += optind;
397	if (argc > 0)
398		sets = atoi(*argv);
399
400	if (cache_config)
401		auto_config(pages, &buffers, &sets);
402
403	pthread_mutex_init(&funnel, NULL);
404	pthread_cond_init(&barrier, NULL);
405
406	/*
407 	 * Fire up the worker threads.
408	 */
409	threads = sets * stages;
410	mutter("Launching %d set%s of %d threads with %saffinity, "
411			"consumer reads%s data\n",
412		sets, s_if_plural(sets), stages, affinity? "": "no ",
413		(consumer_fnp == &reader_writer_fn)? " and writes" : "");
414	if (pages < 256)
415		mutter("  %dkB bytes per buffer, ", pages * 4);
416	else
417		mutter("  %dMB bytes per buffer, ", pages / 256);
418	mutter("%d buffer%s per set ",
419		buffers, s_if_plural(buffers));
420	if (buffers * pages < 256)
421		mutter("(total %dkB)\n", buffers * pages * 4);
422	else
423		mutter("(total %dMB)\n", buffers * pages / 256);
424	mutter("  processing %d buffer%s...\n",
425		iterations, s_if_plural(iterations));
426	line_info = (line_info_t *) malloc(sets * sizeof(line_info_t));
427	stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t));
428	for (i = 0; i < sets; i++) {
429		work_t	*work_array;
430
431		lp = &line_info[i];
432
433		lp->setnum = i + 1;
434		lp->isize = pages * 4096 / sizeof(int);
435		lp->data = (int *) malloc(buffers * pages * 4096);
436
437		/* Set up the queue for the workers of this thread set: */
438		for (j = 0; j < stages; j++) {
439			sp = &stage_info[(i*stages) + j];
440			sp->stagenum = j;
441			sp->set = lp;
442			lp->stage[j] = sp;
443			pthread_mutex_init(&sp->bufq.mtx, NULL);
444			pthread_cond_init(&sp->bufq.cnd, NULL);
445			TAILQ_INIT(&sp->bufq.queue);
446			sp->bufq.waiters = FALSE;
447		}
448
449		/*
450		 * Take a second pass through the stages
451		 * to define what the workers are and to interconnect their input/outputs
452		 */
453		for (j = 0; j < stages; j++) {
454			sp = lp->stage[j];
455			if (j == 0) {
456				sp->fn = producer_fnp;
457				sp->name = "producer";
458			} else {
459				sp->fn = consumer_fnp;
460				sp->name = "consumer";
461			}
462			sp->input = &lp->stage[j]->bufq;
463			sp->output = &lp->stage[(j + 1) % stages]->bufq;
464		}
465
466		/* Set up the buffers on the first worker of the set. */
467		work_array = (work_t *)  malloc(buffers * sizeof(work_t));
468		for (j = 0; j < buffers; j++) {
469			work_array[j].data = lp->data + (lp->isize * j);
470			TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link);
471			DBG("  empty work item %p for set %d data %p\n",
472				&work_array[j], i, work_array[j].data);
473		}
474
475		/* Create this set of threads */
476		for (j = 0; j < stages; j++) {
477			if (ret = pthread_create(&lp->stage[j]->thread, NULL,
478					&manager_fn,
479					(void *) lp->stage[j]))
480			err(1, "pthread_create %d,%d", i, j);
481		}
482	}
483
484	/*
485	 * We sit back anf wait for the slave to finish.
486	 */
487	for (i = 0; i < sets; i++) {
488		lp = &line_info[i];
489		for (j = 0; j < stages; j++) {
490			if(ret = pthread_join(lp->stage[j]->thread, (void **)&status))
491			    err(1, "pthread_join %d,%d", i, j);
492			DBG("Thread %d,%d status %d\n", i, j, status);
493		}
494	}
495
496	/*
497	 * See how long the work took.
498	 */
499	timer = mach_absolute_time() - timer;
500	timer = timer / 1000000ULL;
501	printf("%d.%03d seconds elapsed.\n",
502		(int) (timer/1000ULL), (int) (timer % 1000ULL));
503
504	return 0;
505}
506