1#include <AvailabilityMacros.h>
2#include <mach/thread_policy.h>
3#include <mach/mach.h>
4#include <mach/mach_error.h>
5#include <mach/mach_time.h>
6#include <pthread.h>
7#include <sys/queue.h>
8#include <stdio.h>
9#include <stdlib.h>
10#include <string.h>
11#include <unistd.h>
12#include <err.h>
13#include <errno.h>
14#include <sys/sysctl.h>
15
16/*
17 * Sets is a multithreaded test/benchmarking program to evaluate
18 * affinity set placement in Leopard.
19 *
20 * The picture here, for each set, is:
21 *
22 *       free                   work
23 *    -> queue --> producer --> queue --> consumer --
24 *   |                                               |
25 *    -----------------------------------------------
26 *
27 *       <------ "stage" -----> <------ "stage" ----->
28
29 * We spin off sets of production line threads (2 sets by default).
30 * All threads of each line sets the same affinity tag (unless disabled).
31 * By default there are 2 stage (worker) threads per production line.
32 * A worker thread removes a buffer from an input queue, processses it and
33 * queues it on an output queue.  By default the initial stage (producer)
34 * writes every byte in a buffer and the other (consumer) stages read every
35 * byte. By default the buffers are 1MB (256 pages) in size but this can be
36 * overidden.  By default there are 2 buffers per set (again overridable).
37 * Worker threads process (iterate over) 10000 buffers by default.
38 *
39 * With affinity enabled, each producer and consumer thread sets its affinity
40 * to the set number, 1 .. N. So the threads of each set share an L2 cache.
41 *
42 * Buffer management uses pthread mutex/condition variables. A thread blocks
43 * when no buffer is available on a queue and it is signaled when a buffer
44 * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
45 * The queue management is centralized in a single routine: what queues to
46 * use as input and output and what function to call for processing is
47 * data-driven.
48 */
49
50pthread_mutex_t funnel;
51pthread_cond_t	barrier;
52
53uint64_t	timer;
54int		threads;
55int		threads_ready = 0;
56
57int		iterations = 10000;
58boolean_t	affinity = FALSE;
59boolean_t	halting = FALSE;
60boolean_t	cache_config = FALSE;
61int		verbosity = 1;
62
63typedef struct work {
64	TAILQ_ENTRY(work)	link;
65	int			*data;
66} work_t;
67
68/*
69 * A work queue, complete with pthread objects for its management
70 */
71typedef struct work_queue {
72	pthread_mutex_t		mtx;
73	pthread_cond_t		cnd;
74	TAILQ_HEAD(, work)	queue;
75	boolean_t		waiters;
76} work_queue_t;
77
78/* Worker functions take a integer array and size */
79typedef void (worker_fn_t)(int *, int);
80
81/* This struct controls the function of a thread */
82typedef struct {
83	int			stagenum;
84	char			*name;
85	worker_fn_t		*fn;
86	work_queue_t		*input;
87	work_queue_t		*output;
88	struct line_info	*set;
89	pthread_t		thread;
90	work_queue_t		bufq;
91} stage_info_t;
92
93/* This defines a thread set */
94#define WORKERS_MAX 10
95typedef struct line_info {
96	int			setnum;
97	int			*data;
98	int			isize;
99	stage_info_t		*stage[WORKERS_MAX];
100} line_info_t;
101
102#define DBG(x...) do {				\
103	if (verbosity > 1) {			\
104		pthread_mutex_lock(&funnel);	\
105		printf(x);			\
106		pthread_mutex_unlock(&funnel);	\
107	}					\
108} while (0)
109
110#define mutter(x...) do {			\
111	if (verbosity > 0) {			\
112		printf(x);			\
113	}					\
114} while (0)
115
116#define s_if_plural(x)	(((x) > 1) ? "s" : "")
117
118static void
119usage()
120{
121	fprintf(stderr,
122		"usage: sets [-a]   Turn affinity on (off)\n"
123		"            [-b B] Number of buffers per set/line (2)\n"
124		"            [-c]   Configure for max cache performance\n"
125		"            [-h]   Print this\n"
126		"            [-i I] Number of items/buffers to process (1000)\n"
127		"            [-s S] Number of stages per set/line (2)\n"
128		"            [-t]   Halt for keyboard input to start\n"
129		"            [-p P] Number of pages per buffer (256=1MB)]\n"
130		"            [-w]   Consumer writes data\n"
131		"            [-v V] Level of verbosity 0..2 (1)\n"
132		"            [N]    Number of sets/lines (2)\n"
133	);
134	exit(1);
135}
136
137/* Trivial producer: write to each byte */
138void
139writer_fn(int *data, int isize)
140{
141	int 	i;
142
143	for (i = 0; i < isize; i++) {
144		data[i] = i;
145	}
146}
147
148/* Trivial consumer: read each byte */
149void
150reader_fn(int *data, int isize)
151{
152	int 	i;
153	int	datum;
154
155	for (i = 0; i < isize; i++) {
156		datum = data[i];
157	}
158}
159
160/* Consumer reading and writing the buffer */
161void
162reader_writer_fn(int *data, int isize)
163{
164	int 	i;
165
166	for (i = 0; i < isize; i++) {
167		data[i] += 1;
168	}
169}
170
171/*
172 * This is the central function for every thread.
173 * For each invocation, its role is ets by (a pointer to) a stage_info_t.
174 */
175void *
176manager_fn(void *arg)
177{
178	stage_info_t			*sp = (stage_info_t *) arg;
179	line_info_t			*lp = sp->set;
180	kern_return_t			ret;
181	long				iteration = 0;
182
183	/*
184	 * If we're using affinity sets (we are by default)
185	 * set our tag to by our thread set number.
186	 */
187	thread_extended_policy_data_t	epolicy;
188	thread_affinity_policy_data_t	policy;
189
190	epolicy.timeshare = FALSE;
191	ret = thread_policy_set(
192			mach_thread_self(), THREAD_EXTENDED_POLICY,
193			(thread_policy_t) &epolicy,
194			THREAD_EXTENDED_POLICY_COUNT);
195	if (ret != KERN_SUCCESS)
196		printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
197
198	if (affinity) {
199		policy.affinity_tag = lp->setnum;
200		ret = thread_policy_set(
201				mach_thread_self(), THREAD_AFFINITY_POLICY,
202				(thread_policy_t) &policy,
203				THREAD_AFFINITY_POLICY_COUNT);
204		if (ret != KERN_SUCCESS)
205			printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
206	}
207
208	DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum);
209
210	/*
211	 * Start barrier.
212	 * The tets thread to get here releases everyone and starts the timer.
213	 */
214	pthread_mutex_lock(&funnel);
215	threads_ready++;
216	if (threads_ready == threads) {
217		pthread_mutex_unlock(&funnel);
218		if (halting) {
219			printf("  all threads ready for process %d, "
220				"hit any key to start", getpid());
221			fflush(stdout);
222			(void) getchar();
223		}
224		pthread_cond_broadcast(&barrier);
225		timer = mach_absolute_time();
226	} else {
227		pthread_cond_wait(&barrier, &funnel);
228		pthread_mutex_unlock(&funnel);
229	}
230
231	do {
232		int		i;
233		work_t		*workp;
234
235		/*
236		 * Get a buffer from the input queue.
237		 * Block if none.
238		 */
239		pthread_mutex_lock(&sp->input->mtx);
240		while (1) {
241			workp = TAILQ_FIRST(&(sp->input->queue));
242			if (workp != NULL)
243				break;
244			DBG("    %s[%d,%d] iteration %d waiting for buffer\n",
245				sp->name, lp->setnum, sp->stagenum, iteration);
246			sp->input->waiters = TRUE;
247			pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
248			sp->input->waiters = FALSE;
249		}
250		TAILQ_REMOVE(&(sp->input->queue), workp, link);
251		pthread_mutex_unlock(&sp->input->mtx);
252
253		DBG("  %s[%d,%d] iteration %d work %p data %p\n",
254			sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data);
255
256		/* Do our stuff with the buffer */
257		(void) sp->fn(workp->data, lp->isize);
258
259		/*
260		 * Place the buffer on the input queue.
261		 * Signal  waiters if required.
262		 */
263		pthread_mutex_lock(&sp->output->mtx);
264		TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
265		if (sp->output->waiters) {
266			DBG("    %s[%d,%d] iteration %d signaling work\n",
267				sp->name, lp->setnum, sp->stagenum, iteration);
268			pthread_cond_signal(&sp->output->cnd);
269		}
270		pthread_mutex_unlock(&sp->output->mtx);
271	} while (++iteration < iterations);
272
273	DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum);
274
275	return (void *) iteration;
276}
277
278#define	MAX_CACHE_DEPTH 10
279static void
280auto_config(int npages, int *nbufs, int *nsets)
281{
282	size_t	len;
283	int	ncpu;
284	int	llc;
285	int64_t	cacheconfig[MAX_CACHE_DEPTH];
286	int64_t	cachesize[MAX_CACHE_DEPTH];
287
288	mutter("Autoconfiguring...\n");
289
290	len = sizeof(cacheconfig);
291	if (sysctlbyname("hw.cacheconfig",
292			 &cacheconfig[0], &len, NULL, 0) != 0) {
293		printf("Unable to get hw.cacheconfig, %d\n", errno);
294		exit(1);
295	}
296	len = sizeof(cachesize);
297	if (sysctlbyname("hw.cachesize",
298			 &cachesize[0],  &len, NULL, 0) != 0) {
299		printf("Unable to get hw.cachesize, %d\n", errno);
300		exit(1);
301	}
302
303	/*
304	 * Find LLC
305	 */
306	for (llc = MAX_CACHE_DEPTH - 1; llc > 0; llc--)
307		if (cacheconfig[llc] != 0)
308			break;
309
310	/*
311	 * Calculate number of buffers of size pages*4096 bytes
312	 * fit into 90% of an L2 cache.
313	 */
314	*nbufs = cachesize[llc] * 9 / (npages * 4096 * 10);
315	mutter("  L%d (LLC) cache %qd bytes: "
316		"using %d buffers of size %d bytes\n",
317		llc, cachesize[llc], *nbufs, (npages * 4096));
318
319	/*
320	 * Calcalute how many sets:
321	 */
322	*nsets = cacheconfig[0]/cacheconfig[llc];
323	mutter("  %qd cpus; %qd cpus per L%d cache: using %d sets\n",
324		cacheconfig[0], cacheconfig[llc], llc, *nsets);
325}
326
327void (*producer_fnp)(int *data, int isize) = &writer_fn;
328void (*consumer_fnp)(int *data, int isize) = &reader_fn;
329
330int
331main(int argc, char *argv[])
332{
333	int			i;
334	int			j;
335	int			pages = 256; /* 1MB */
336	int			buffers = 2;
337	int			sets = 2;
338	int			stages = 2;
339	int			*status;
340	line_info_t		*line_info;
341	line_info_t		*lp;
342	stage_info_t		*stage_info;
343	stage_info_t		*sp;
344	kern_return_t		ret;
345	int			c;
346
347	/* Do switch parsing: */
348	while ((c = getopt (argc, argv, "ab:chi:p:s:twv:")) != -1) {
349		switch (c) {
350		case 'a':
351			affinity = !affinity;
352			break;
353		case 'b':
354			buffers = atoi(optarg);
355			break;
356		case 'c':
357			cache_config = TRUE;
358			break;
359		case 'i':
360			iterations = atoi(optarg);
361			break;
362		case 'p':
363			pages = atoi(optarg);
364			break;
365		case 's':
366			stages = atoi(optarg);
367			if (stages >= WORKERS_MAX)
368				usage();
369			break;
370		case 't':
371			halting = TRUE;
372			break;
373		case 'w':
374			consumer_fnp = &reader_writer_fn;
375			break;
376		case 'v':
377			verbosity = atoi(optarg);
378			break;
379		case '?':
380		case 'h':
381		default:
382			usage();
383		}
384	}
385	argc -= optind; argv += optind;
386	if (argc > 0)
387		sets = atoi(*argv);
388
389	if (cache_config)
390		auto_config(pages, &buffers, &sets);
391
392	pthread_mutex_init(&funnel, NULL);
393	pthread_cond_init(&barrier, NULL);
394
395	/*
396 	 * Fire up the worker threads.
397	 */
398	threads = sets * stages;
399	mutter("Launching %d set%s of %d threads with %saffinity, "
400			"consumer reads%s data\n",
401		sets, s_if_plural(sets), stages, affinity? "": "no ",
402		(consumer_fnp == &reader_writer_fn)? " and writes" : "");
403	if (pages < 256)
404		mutter("  %dkB bytes per buffer, ", pages * 4);
405	else
406		mutter("  %dMB bytes per buffer, ", pages / 256);
407	mutter("%d buffer%s per set ",
408		buffers, s_if_plural(buffers));
409	if (buffers * pages < 256)
410		mutter("(total %dkB)\n", buffers * pages * 4);
411	else
412		mutter("(total %dMB)\n", buffers * pages / 256);
413	mutter("  processing %d buffer%s...\n",
414		iterations, s_if_plural(iterations));
415	line_info = (line_info_t *) malloc(sets * sizeof(line_info_t));
416	stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t));
417	for (i = 0; i < sets; i++) {
418		work_t	*work_array;
419
420		lp = &line_info[i];
421
422		lp->setnum = i + 1;
423		lp->isize = pages * 4096 / sizeof(int);
424		lp->data = (int *) malloc(buffers * pages * 4096);
425
426		/* Set up the queue for the workers of this thread set: */
427		for (j = 0; j < stages; j++) {
428			sp = &stage_info[(i*stages) + j];
429			sp->stagenum = j;
430			sp->set = lp;
431			lp->stage[j] = sp;
432			pthread_mutex_init(&sp->bufq.mtx, NULL);
433			pthread_cond_init(&sp->bufq.cnd, NULL);
434			TAILQ_INIT(&sp->bufq.queue);
435			sp->bufq.waiters = FALSE;
436		}
437
438		/*
439		 * Take a second pass through the stages
440		 * to define what the workers are and to interconnect their input/outputs
441		 */
442		for (j = 0; j < stages; j++) {
443			sp = lp->stage[j];
444			if (j == 0) {
445				sp->fn = producer_fnp;
446				sp->name = "producer";
447			} else {
448				sp->fn = consumer_fnp;
449				sp->name = "consumer";
450			}
451			sp->input = &lp->stage[j]->bufq;
452			sp->output = &lp->stage[(j + 1) % stages]->bufq;
453		}
454
455		/* Set up the buffers on the first worker of the set. */
456		work_array = (work_t *)  malloc(buffers * sizeof(work_t));
457		for (j = 0; j < buffers; j++) {
458			work_array[j].data = lp->data + (lp->isize * j);
459			TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link);
460			DBG("  empty work item %p for set %d data %p\n",
461				&work_array[j], i, work_array[j].data);
462		}
463
464		/* Create this set of threads */
465		for (j = 0; j < stages; j++) {
466			if (ret = pthread_create(&lp->stage[j]->thread, NULL,
467					&manager_fn,
468					(void *) lp->stage[j]))
469			err(1, "pthread_create %d,%d", i, j);
470		}
471	}
472
473	/*
474	 * We sit back anf wait for the slave to finish.
475	 */
476	for (i = 0; i < sets; i++) {
477		lp = &line_info[i];
478		for (j = 0; j < stages; j++) {
479			if(ret = pthread_join(lp->stage[j]->thread, (void **)&status))
480			    err(1, "pthread_join %d,%d", i, j);
481			DBG("Thread %d,%d status %d\n", i, j, status);
482		}
483	}
484
485	/*
486	 * See how long the work took.
487	 */
488	timer = mach_absolute_time() - timer;
489	timer = timer / 1000000ULL;
490	printf("%d.%03d seconds elapsed.\n",
491		(int) (timer/1000ULL), (int) (timer % 1000ULL));
492
493	return 0;
494}
495