1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1997,2008 Oracle.  All rights reserved.
5 *
6 * $Id: ex_thread.c,v 12.8 2008/01/08 20:58:23 bostic Exp $
7 */
8
9#include <sys/types.h>
10#include <sys/time.h>
11
12#include <errno.h>
13#include <pthread.h>
14#include <signal.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18#include <time.h>
19
20#ifdef _WIN32
21extern int getopt(int, char * const *, const char *);
22#else
23#include <unistd.h>
24#endif
25
26#include <db.h>
27
28/*
29 * NB: This application is written using POSIX 1003.1b-1993 pthreads
30 * interfaces, which may not be portable to your system.
31 */
32extern int sched_yield __P((void));		/* Pthread yield function. */
33
34int	db_init __P((const char *));
35void   *deadlock __P((void *));
36void	fatal __P((const char *, int, int));
37void	onint __P((int));
38int	main __P((int, char *[]));
39int	reader __P((int));
40void	stats __P((void));
41void   *trickle __P((void *));
42void   *tstart __P((void *));
43int	usage __P((void));
44void	word __P((void));
45int	writer __P((int));
46
47int	quit;					/* Interrupt handling flag. */
48
49struct _statistics {
50	int aborted;				/* Write. */
51	int aborts;				/* Read/write. */
52	int adds;				/* Write. */
53	int deletes;				/* Write. */
54	int txns;				/* Write. */
55	int found;				/* Read. */
56	int notfound;				/* Read. */
57} *perf;
58
59const char
60	*progname = "ex_thread";		/* Program name. */
61
62#define	DATABASE	"access.db"		/* Database name. */
63#define	WORDLIST	"../test/wordlist"	/* Dictionary. */
64
65/*
66 * We can seriously increase the number of collisions and transaction
67 * aborts by yielding the scheduler after every DB call.  Specify the
68 * -p option to do this.
69 */
70int	punish;					/* -p */
71int	nlist;					/* -n */
72int	nreaders;				/* -r */
73int	verbose;				/* -v */
74int	nwriters;				/* -w */
75
76DB     *dbp;					/* Database handle. */
77DB_ENV *dbenv;					/* Database environment. */
78int	nthreads;				/* Total threads. */
79char  **list;					/* Word list. */
80
81/*
82 * ex_thread --
83 *	Run a simple threaded application of some numbers of readers and
84 *	writers competing for a set of words.
85 *
86 * Example UNIX shell script to run this program:
87 *	% rm -rf TESTDIR
88 *	% mkdir TESTDIR
89 *	% ex_thread -h TESTDIR
90 */
91int
92main(argc, argv)
93	int argc;
94	char *argv[];
95{
96	extern char *optarg;
97	extern int errno, optind;
98	DB_TXN *txnp;
99	pthread_t *tids;
100	int ch, i, ret;
101	const char *home;
102	void *retp;
103
104	txnp = NULL;
105	nlist = 1000;
106	nreaders = nwriters = 4;
107	home = "TESTDIR";
108	while ((ch = getopt(argc, argv, "h:pn:r:vw:")) != EOF)
109		switch (ch) {
110		case 'h':
111			home = optarg;
112			break;
113		case 'p':
114			punish = 1;
115			break;
116		case 'n':
117			nlist = atoi(optarg);
118			break;
119		case 'r':
120			nreaders = atoi(optarg);
121			break;
122		case 'v':
123			verbose = 1;
124			break;
125		case 'w':
126			nwriters = atoi(optarg);
127			break;
128		case '?':
129		default:
130			return (usage());
131		}
132	argc -= optind;
133	argv += optind;
134
135	/* Initialize the random number generator. */
136	srand(getpid() | time(NULL));
137
138	/* Register the signal handler. */
139	(void)signal(SIGINT, onint);
140
141	/* Build the key list. */
142	word();
143
144	/* Remove the previous database. */
145	(void)remove(DATABASE);
146
147	/* Initialize the database environment. */
148	if ((ret = db_init(home)) != 0)
149		return (ret);
150
151	/* Initialize the database. */
152	if ((ret = db_create(&dbp, dbenv, 0)) != 0) {
153		dbenv->err(dbenv, ret, "db_create");
154		(void)dbenv->close(dbenv, 0);
155		return (EXIT_FAILURE);
156	}
157	if ((ret = dbp->set_pagesize(dbp, 1024)) != 0) {
158		dbp->err(dbp, ret, "set_pagesize");
159		goto err;
160	}
161
162	if ((ret = dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0)
163		fatal("txn_begin", ret, 1);
164	if ((ret = dbp->open(dbp, txnp,
165	     DATABASE, NULL, DB_BTREE, DB_CREATE | DB_THREAD, 0664)) != 0) {
166		dbp->err(dbp, ret, "%s: open", DATABASE);
167		goto err;
168	} else {
169		ret = txnp->commit(txnp, 0);
170		txnp = NULL;
171		if (ret != 0)
172			goto err;
173	}
174
175	nthreads = nreaders + nwriters + 2;
176	printf("Running: readers %d, writers %d\n", nreaders, nwriters);
177	fflush(stdout);
178
179	/* Create statistics structures, offset by 1. */
180	if ((perf = calloc(nreaders + nwriters + 1, sizeof(*perf))) == NULL)
181		fatal(NULL, errno, 1);
182
183	/* Create thread ID structures. */
184	if ((tids = malloc(nthreads * sizeof(pthread_t))) == NULL)
185		fatal(NULL, errno, 1);
186
187	/* Create reader/writer threads. */
188	for (i = 0; i < nreaders + nwriters; ++i)
189		if ((ret = pthread_create(
190		    &tids[i], NULL, tstart, (void *)(uintptr_t)i)) != 0)
191			fatal("pthread_create", ret > 0 ? ret : errno, 1);
192
193	/* Create buffer pool trickle thread. */
194	if (pthread_create(&tids[i], NULL, trickle, &i))
195		fatal("pthread_create", errno, 1);
196	++i;
197
198	/* Create deadlock detector thread. */
199	if (pthread_create(&tids[i], NULL, deadlock, &i))
200		fatal("pthread_create", errno, 1);
201
202	/* Wait for the threads. */
203	for (i = 0; i < nthreads; ++i)
204		(void)pthread_join(tids[i], &retp);
205
206	printf("Exiting\n");
207	stats();
208
209err:	if (txnp != NULL)
210		(void)txnp->abort(txnp);
211	(void)dbp->close(dbp, 0);
212	(void)dbenv->close(dbenv, 0);
213
214	return (EXIT_SUCCESS);
215}
216
217int
218reader(id)
219	int id;
220{
221	DBT key, data;
222	int n, ret;
223	char buf[64];
224
225	/*
226	 * DBT's must use local memory or malloc'd memory if the DB handle
227	 * is accessed in a threaded fashion.
228	 */
229	memset(&key, 0, sizeof(DBT));
230	memset(&data, 0, sizeof(DBT));
231	data.flags = DB_DBT_MALLOC;
232
233	/*
234	 * Read-only threads do not require transaction protection, unless
235	 * there's a need for repeatable reads.
236	 */
237	while (!quit) {
238		/* Pick a key at random, and look it up. */
239		n = rand() % nlist;
240		key.data = list[n];
241		key.size = strlen(key.data);
242
243		if (verbose) {
244			sprintf(buf, "reader: %d: list entry %d\n", id, n);
245			write(STDOUT_FILENO, buf, strlen(buf));
246		}
247
248		switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {
249		case DB_LOCK_DEADLOCK:		/* Deadlock. */
250			++perf[id].aborts;
251			break;
252		case 0:				/* Success. */
253			++perf[id].found;
254			free(data.data);
255			break;
256		case DB_NOTFOUND:		/* Not found. */
257			++perf[id].notfound;
258			break;
259		default:
260			sprintf(buf,
261			    "reader %d: dbp->get: %s", id, (char *)key.data);
262			fatal(buf, ret, 0);
263		}
264	}
265	return (0);
266}
267
268int
269writer(id)
270	int id;
271{
272	DBT key, data;
273	DB_TXN *tid;
274	time_t now, then;
275	int n, ret;
276	char buf[256], dbuf[10000];
277
278	time(&now);
279	then = now;
280
281	/*
282	 * DBT's must use local memory or malloc'd memory if the DB handle
283	 * is accessed in a threaded fashion.
284	 */
285	memset(&key, 0, sizeof(DBT));
286	memset(&data, 0, sizeof(DBT));
287	data.data = dbuf;
288	data.ulen = sizeof(dbuf);
289	data.flags = DB_DBT_USERMEM;
290
291	while (!quit) {
292		/* Pick a random key. */
293		n = rand() % nlist;
294		key.data = list[n];
295		key.size = strlen(key.data);
296
297		if (verbose) {
298			sprintf(buf, "writer: %d: list entry %d\n", id, n);
299			write(STDOUT_FILENO, buf, strlen(buf));
300		}
301
302		/* Abort and retry. */
303		if (0) {
304retry:			if ((ret = tid->abort(tid)) != 0)
305				fatal("DB_TXN->abort", ret, 1);
306			++perf[id].aborts;
307			++perf[id].aborted;
308		}
309
310		/* Thread #1 prints out the stats every 20 seconds. */
311		if (id == 1) {
312			time(&now);
313			if (now - then >= 20) {
314				stats();
315				then = now;
316			}
317		}
318
319		/* Begin the transaction. */
320		if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0)
321			fatal("txn_begin", ret, 1);
322
323		/*
324		 * Get the key.  If it doesn't exist, add it.  If it does
325		 * exist, delete it.
326		 */
327		switch (ret = dbp->get(dbp, tid, &key, &data, 0)) {
328		case DB_LOCK_DEADLOCK:
329			goto retry;
330		case 0:
331			goto delete;
332		case DB_NOTFOUND:
333			goto add;
334		}
335
336		sprintf(buf, "writer: %d: dbp->get", id);
337		fatal(buf, ret, 1);
338		/* NOTREACHED */
339
340delete:		/* Delete the key. */
341		switch (ret = dbp->del(dbp, tid, &key, 0)) {
342		case DB_LOCK_DEADLOCK:
343			goto retry;
344		case 0:
345			++perf[id].deletes;
346			goto commit;
347		}
348
349		sprintf(buf, "writer: %d: dbp->del", id);
350		fatal(buf, ret, 1);
351		/* NOTREACHED */
352
353add:		/* Add the key.  1 data item in 30 is an overflow item. */
354		data.size = 20 + rand() % 128;
355		if (rand() % 30 == 0)
356			data.size += 8192;
357
358		switch (ret = dbp->put(dbp, tid, &key, &data, 0)) {
359		case DB_LOCK_DEADLOCK:
360			goto retry;
361		case 0:
362			++perf[id].adds;
363			goto commit;
364		default:
365			sprintf(buf, "writer: %d: dbp->put", id);
366			fatal(buf, ret, 1);
367		}
368
369commit:		/* The transaction finished, commit it. */
370		if ((ret = tid->commit(tid, 0)) != 0)
371			fatal("DB_TXN->commit", ret, 1);
372
373		/*
374		 * Every time the thread completes 20 transactions, show
375		 * our progress.
376		 */
377		if (++perf[id].txns % 20 == 0) {
378			sprintf(buf,
379"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n",
380			    id, perf[id].adds, perf[id].deletes,
381			    perf[id].aborts, perf[id].txns);
382			write(STDOUT_FILENO, buf, strlen(buf));
383		}
384
385		/*
386		 * If this thread was aborted more than 5 times before
387		 * the transaction finished, complain.
388		 */
389		if (perf[id].aborted > 5) {
390			sprintf(buf,
391"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d: ABORTED: %2d\n",
392			    id, perf[id].adds, perf[id].deletes,
393			    perf[id].aborts, perf[id].txns, perf[id].aborted);
394			write(STDOUT_FILENO, buf, strlen(buf));
395		}
396		perf[id].aborted = 0;
397	}
398	return (0);
399}
400
401/*
402 * stats --
403 *	Display reader/writer thread statistics.  To display the statistics
404 *	for the mpool trickle or deadlock threads, use db_stat(1).
405 */
406void
407stats()
408{
409	int id;
410	char *p, buf[8192];
411
412	p = buf + sprintf(buf, "-------------\n");
413	for (id = 0; id < nreaders + nwriters;)
414		if (id++ < nwriters)
415			p += sprintf(p,
416	"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n",
417			    id, perf[id].adds,
418			    perf[id].deletes, perf[id].aborts, perf[id].txns);
419		else
420			p += sprintf(p,
421	"reader: %2d: found: %5d: notfound: %5d: aborts: %4d\n",
422			    id, perf[id].found,
423			    perf[id].notfound, perf[id].aborts);
424	p += sprintf(p, "-------------\n");
425
426	write(STDOUT_FILENO, buf, p - buf);
427}
428
429/*
430 * db_init --
431 *	Initialize the environment.
432 */
433int
434db_init(home)
435	const char *home;
436{
437	int ret;
438
439	if ((ret = db_env_create(&dbenv, 0)) != 0) {
440		fprintf(stderr,
441		    "%s: db_env_create: %s\n", progname, db_strerror(ret));
442		return (EXIT_FAILURE);
443	}
444	if (punish)
445		(void)dbenv->set_flags(dbenv, DB_YIELDCPU, 1);
446
447	dbenv->set_errfile(dbenv, stderr);
448	dbenv->set_errpfx(dbenv, progname);
449	(void)dbenv->set_cachesize(dbenv, 0, 100 * 1024, 0);
450	(void)dbenv->set_lg_max(dbenv, 200000);
451
452	if ((ret = dbenv->open(dbenv, home,
453	    DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
454	    DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0)) != 0) {
455		dbenv->err(dbenv, ret, NULL);
456		(void)dbenv->close(dbenv, 0);
457		return (EXIT_FAILURE);
458	}
459
460	return (0);
461}
462
463/*
464 * tstart --
465 *	Thread start function for readers and writers.
466 */
467void *
468tstart(arg)
469	void *arg;
470{
471	pthread_t tid;
472	u_int id;
473
474	id = (uintptr_t)arg + 1;
475
476	tid = pthread_self();
477
478	if (id <= (u_int)nwriters) {
479		printf("write thread %d starting: tid: %lu\n", id, (u_long)tid);
480		fflush(stdout);
481		writer(id);
482	} else {
483		printf("read thread %d starting: tid: %lu\n", id, (u_long)tid);
484		fflush(stdout);
485		reader(id);
486	}
487
488	/* NOTREACHED */
489	return (NULL);
490}
491
492/*
493 * deadlock --
494 *	Thread start function for DB_ENV->lock_detect.
495 */
496void *
497deadlock(arg)
498	void *arg;
499{
500	struct timeval t;
501	pthread_t tid;
502
503	arg = arg;				/* XXX: shut the compiler up. */
504	tid = pthread_self();
505
506	printf("deadlock thread starting: tid: %lu\n", (u_long)tid);
507	fflush(stdout);
508
509	t.tv_sec = 0;
510	t.tv_usec = 100000;
511	while (!quit) {
512		(void)dbenv->lock_detect(dbenv, 0, DB_LOCK_YOUNGEST, NULL);
513
514		/* Check every 100ms. */
515		(void)select(0, NULL, NULL, NULL, &t);
516	}
517
518	return (NULL);
519}
520
521/*
522 * trickle --
523 *	Thread start function for memp_trickle.
524 */
525void *
526trickle(arg)
527	void *arg;
528{
529	pthread_t tid;
530	int wrote;
531	char buf[64];
532
533	arg = arg;				/* XXX: shut the compiler up. */
534	tid = pthread_self();
535
536	printf("trickle thread starting: tid: %lu\n", (u_long)tid);
537	fflush(stdout);
538
539	while (!quit) {
540		(void)dbenv->memp_trickle(dbenv, 10, &wrote);
541		if (verbose) {
542			sprintf(buf, "trickle: wrote %d\n", wrote);
543			write(STDOUT_FILENO, buf, strlen(buf));
544		}
545		if (wrote == 0) {
546			sleep(1);
547			sched_yield();
548		}
549	}
550
551	return (NULL);
552}
553
554/*
555 * word --
556 *	Build the dictionary word list.
557 */
558void
559word()
560{
561	FILE *fp;
562	int cnt;
563	char buf[256];
564
565	if ((fp = fopen(WORDLIST, "r")) == NULL)
566		fatal(WORDLIST, errno, 1);
567
568	if ((list = malloc(nlist * sizeof(char *))) == NULL)
569		fatal(NULL, errno, 1);
570
571	for (cnt = 0; cnt < nlist; ++cnt) {
572		if (fgets(buf, sizeof(buf), fp) == NULL)
573			break;
574		if ((list[cnt] = strdup(buf)) == NULL)
575			fatal(NULL, errno, 1);
576	}
577	nlist = cnt;		/* In case nlist was larger than possible. */
578}
579
580/*
581 * fatal --
582 *	Report a fatal error and quit.
583 */
584void
585fatal(msg, err, syserr)
586	const char *msg;
587	int err, syserr;
588{
589	fprintf(stderr, "%s: ", progname);
590	if (msg != NULL) {
591		fprintf(stderr, "%s", msg);
592		if (syserr)
593			fprintf(stderr, ": ");
594	}
595	if (syserr)
596		fprintf(stderr, "%s", strerror(err));
597	fprintf(stderr, "\n");
598	exit(EXIT_FAILURE);
599
600	/* NOTREACHED */
601}
602
603/*
604 * usage --
605 *	Usage message.
606 */
607int
608usage()
609{
610	(void)fprintf(stderr,
611    "usage: %s [-pv] [-h home] [-n words] [-r readers] [-w writers]\n",
612	    progname);
613	return (EXIT_FAILURE);
614}
615
616/*
617 * onint --
618 *	Interrupt signal handler.
619 */
620void
621onint(signo)
622	int signo;
623{
624	signo = 0;		/* Quiet compiler. */
625	quit = 1;
626}
627