1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1997,2008 Oracle. All rights reserved. 5 * 6 * $Id: ex_thread.c,v 12.8 2008/01/08 20:58:23 bostic Exp $ 7 */ 8 9#include <sys/types.h> 10#include <sys/time.h> 11 12#include <errno.h> 13#include <pthread.h> 14#include <signal.h> 15#include <stdio.h> 16#include <stdlib.h> 17#include <string.h> 18#include <time.h> 19 20#ifdef _WIN32 21extern int getopt(int, char * const *, const char *); 22#else 23#include <unistd.h> 24#endif 25 26#include <db.h> 27 28/* 29 * NB: This application is written using POSIX 1003.1b-1993 pthreads 30 * interfaces, which may not be portable to your system. 31 */ 32extern int sched_yield __P((void)); /* Pthread yield function. */ 33 34int db_init __P((const char *)); 35void *deadlock __P((void *)); 36void fatal __P((const char *, int, int)); 37void onint __P((int)); 38int main __P((int, char *[])); 39int reader __P((int)); 40void stats __P((void)); 41void *trickle __P((void *)); 42void *tstart __P((void *)); 43int usage __P((void)); 44void word __P((void)); 45int writer __P((int)); 46 47int quit; /* Interrupt handling flag. */ 48 49struct _statistics { 50 int aborted; /* Write. */ 51 int aborts; /* Read/write. */ 52 int adds; /* Write. */ 53 int deletes; /* Write. */ 54 int txns; /* Write. */ 55 int found; /* Read. */ 56 int notfound; /* Read. */ 57} *perf; 58 59const char 60 *progname = "ex_thread"; /* Program name. */ 61 62#define DATABASE "access.db" /* Database name. */ 63#define WORDLIST "../test/wordlist" /* Dictionary. */ 64 65/* 66 * We can seriously increase the number of collisions and transaction 67 * aborts by yielding the scheduler after every DB call. Specify the 68 * -p option to do this. 69 */ 70int punish; /* -p */ 71int nlist; /* -n */ 72int nreaders; /* -r */ 73int verbose; /* -v */ 74int nwriters; /* -w */ 75 76DB *dbp; /* Database handle. */ 77DB_ENV *dbenv; /* Database environment. */ 78int nthreads; /* Total threads. */ 79char **list; /* Word list. */ 80 81/* 82 * ex_thread -- 83 * Run a simple threaded application of some numbers of readers and 84 * writers competing for a set of words. 85 * 86 * Example UNIX shell script to run this program: 87 * % rm -rf TESTDIR 88 * % mkdir TESTDIR 89 * % ex_thread -h TESTDIR 90 */ 91int 92main(argc, argv) 93 int argc; 94 char *argv[]; 95{ 96 extern char *optarg; 97 extern int errno, optind; 98 DB_TXN *txnp; 99 pthread_t *tids; 100 int ch, i, ret; 101 const char *home; 102 void *retp; 103 104 txnp = NULL; 105 nlist = 1000; 106 nreaders = nwriters = 4; 107 home = "TESTDIR"; 108 while ((ch = getopt(argc, argv, "h:pn:r:vw:")) != EOF) 109 switch (ch) { 110 case 'h': 111 home = optarg; 112 break; 113 case 'p': 114 punish = 1; 115 break; 116 case 'n': 117 nlist = atoi(optarg); 118 break; 119 case 'r': 120 nreaders = atoi(optarg); 121 break; 122 case 'v': 123 verbose = 1; 124 break; 125 case 'w': 126 nwriters = atoi(optarg); 127 break; 128 case '?': 129 default: 130 return (usage()); 131 } 132 argc -= optind; 133 argv += optind; 134 135 /* Initialize the random number generator. */ 136 srand(getpid() | time(NULL)); 137 138 /* Register the signal handler. */ 139 (void)signal(SIGINT, onint); 140 141 /* Build the key list. */ 142 word(); 143 144 /* Remove the previous database. */ 145 (void)remove(DATABASE); 146 147 /* Initialize the database environment. */ 148 if ((ret = db_init(home)) != 0) 149 return (ret); 150 151 /* Initialize the database. */ 152 if ((ret = db_create(&dbp, dbenv, 0)) != 0) { 153 dbenv->err(dbenv, ret, "db_create"); 154 (void)dbenv->close(dbenv, 0); 155 return (EXIT_FAILURE); 156 } 157 if ((ret = dbp->set_pagesize(dbp, 1024)) != 0) { 158 dbp->err(dbp, ret, "set_pagesize"); 159 goto err; 160 } 161 162 if ((ret = dbenv->txn_begin(dbenv, NULL, &txnp, 0)) != 0) 163 fatal("txn_begin", ret, 1); 164 if ((ret = dbp->open(dbp, txnp, 165 DATABASE, NULL, DB_BTREE, DB_CREATE | DB_THREAD, 0664)) != 0) { 166 dbp->err(dbp, ret, "%s: open", DATABASE); 167 goto err; 168 } else { 169 ret = txnp->commit(txnp, 0); 170 txnp = NULL; 171 if (ret != 0) 172 goto err; 173 } 174 175 nthreads = nreaders + nwriters + 2; 176 printf("Running: readers %d, writers %d\n", nreaders, nwriters); 177 fflush(stdout); 178 179 /* Create statistics structures, offset by 1. */ 180 if ((perf = calloc(nreaders + nwriters + 1, sizeof(*perf))) == NULL) 181 fatal(NULL, errno, 1); 182 183 /* Create thread ID structures. */ 184 if ((tids = malloc(nthreads * sizeof(pthread_t))) == NULL) 185 fatal(NULL, errno, 1); 186 187 /* Create reader/writer threads. */ 188 for (i = 0; i < nreaders + nwriters; ++i) 189 if ((ret = pthread_create( 190 &tids[i], NULL, tstart, (void *)(uintptr_t)i)) != 0) 191 fatal("pthread_create", ret > 0 ? ret : errno, 1); 192 193 /* Create buffer pool trickle thread. */ 194 if (pthread_create(&tids[i], NULL, trickle, &i)) 195 fatal("pthread_create", errno, 1); 196 ++i; 197 198 /* Create deadlock detector thread. */ 199 if (pthread_create(&tids[i], NULL, deadlock, &i)) 200 fatal("pthread_create", errno, 1); 201 202 /* Wait for the threads. */ 203 for (i = 0; i < nthreads; ++i) 204 (void)pthread_join(tids[i], &retp); 205 206 printf("Exiting\n"); 207 stats(); 208 209err: if (txnp != NULL) 210 (void)txnp->abort(txnp); 211 (void)dbp->close(dbp, 0); 212 (void)dbenv->close(dbenv, 0); 213 214 return (EXIT_SUCCESS); 215} 216 217int 218reader(id) 219 int id; 220{ 221 DBT key, data; 222 int n, ret; 223 char buf[64]; 224 225 /* 226 * DBT's must use local memory or malloc'd memory if the DB handle 227 * is accessed in a threaded fashion. 228 */ 229 memset(&key, 0, sizeof(DBT)); 230 memset(&data, 0, sizeof(DBT)); 231 data.flags = DB_DBT_MALLOC; 232 233 /* 234 * Read-only threads do not require transaction protection, unless 235 * there's a need for repeatable reads. 236 */ 237 while (!quit) { 238 /* Pick a key at random, and look it up. */ 239 n = rand() % nlist; 240 key.data = list[n]; 241 key.size = strlen(key.data); 242 243 if (verbose) { 244 sprintf(buf, "reader: %d: list entry %d\n", id, n); 245 write(STDOUT_FILENO, buf, strlen(buf)); 246 } 247 248 switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) { 249 case DB_LOCK_DEADLOCK: /* Deadlock. */ 250 ++perf[id].aborts; 251 break; 252 case 0: /* Success. */ 253 ++perf[id].found; 254 free(data.data); 255 break; 256 case DB_NOTFOUND: /* Not found. */ 257 ++perf[id].notfound; 258 break; 259 default: 260 sprintf(buf, 261 "reader %d: dbp->get: %s", id, (char *)key.data); 262 fatal(buf, ret, 0); 263 } 264 } 265 return (0); 266} 267 268int 269writer(id) 270 int id; 271{ 272 DBT key, data; 273 DB_TXN *tid; 274 time_t now, then; 275 int n, ret; 276 char buf[256], dbuf[10000]; 277 278 time(&now); 279 then = now; 280 281 /* 282 * DBT's must use local memory or malloc'd memory if the DB handle 283 * is accessed in a threaded fashion. 284 */ 285 memset(&key, 0, sizeof(DBT)); 286 memset(&data, 0, sizeof(DBT)); 287 data.data = dbuf; 288 data.ulen = sizeof(dbuf); 289 data.flags = DB_DBT_USERMEM; 290 291 while (!quit) { 292 /* Pick a random key. */ 293 n = rand() % nlist; 294 key.data = list[n]; 295 key.size = strlen(key.data); 296 297 if (verbose) { 298 sprintf(buf, "writer: %d: list entry %d\n", id, n); 299 write(STDOUT_FILENO, buf, strlen(buf)); 300 } 301 302 /* Abort and retry. */ 303 if (0) { 304retry: if ((ret = tid->abort(tid)) != 0) 305 fatal("DB_TXN->abort", ret, 1); 306 ++perf[id].aborts; 307 ++perf[id].aborted; 308 } 309 310 /* Thread #1 prints out the stats every 20 seconds. */ 311 if (id == 1) { 312 time(&now); 313 if (now - then >= 20) { 314 stats(); 315 then = now; 316 } 317 } 318 319 /* Begin the transaction. */ 320 if ((ret = dbenv->txn_begin(dbenv, NULL, &tid, 0)) != 0) 321 fatal("txn_begin", ret, 1); 322 323 /* 324 * Get the key. If it doesn't exist, add it. If it does 325 * exist, delete it. 326 */ 327 switch (ret = dbp->get(dbp, tid, &key, &data, 0)) { 328 case DB_LOCK_DEADLOCK: 329 goto retry; 330 case 0: 331 goto delete; 332 case DB_NOTFOUND: 333 goto add; 334 } 335 336 sprintf(buf, "writer: %d: dbp->get", id); 337 fatal(buf, ret, 1); 338 /* NOTREACHED */ 339 340delete: /* Delete the key. */ 341 switch (ret = dbp->del(dbp, tid, &key, 0)) { 342 case DB_LOCK_DEADLOCK: 343 goto retry; 344 case 0: 345 ++perf[id].deletes; 346 goto commit; 347 } 348 349 sprintf(buf, "writer: %d: dbp->del", id); 350 fatal(buf, ret, 1); 351 /* NOTREACHED */ 352 353add: /* Add the key. 1 data item in 30 is an overflow item. */ 354 data.size = 20 + rand() % 128; 355 if (rand() % 30 == 0) 356 data.size += 8192; 357 358 switch (ret = dbp->put(dbp, tid, &key, &data, 0)) { 359 case DB_LOCK_DEADLOCK: 360 goto retry; 361 case 0: 362 ++perf[id].adds; 363 goto commit; 364 default: 365 sprintf(buf, "writer: %d: dbp->put", id); 366 fatal(buf, ret, 1); 367 } 368 369commit: /* The transaction finished, commit it. */ 370 if ((ret = tid->commit(tid, 0)) != 0) 371 fatal("DB_TXN->commit", ret, 1); 372 373 /* 374 * Every time the thread completes 20 transactions, show 375 * our progress. 376 */ 377 if (++perf[id].txns % 20 == 0) { 378 sprintf(buf, 379"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n", 380 id, perf[id].adds, perf[id].deletes, 381 perf[id].aborts, perf[id].txns); 382 write(STDOUT_FILENO, buf, strlen(buf)); 383 } 384 385 /* 386 * If this thread was aborted more than 5 times before 387 * the transaction finished, complain. 388 */ 389 if (perf[id].aborted > 5) { 390 sprintf(buf, 391"writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d: ABORTED: %2d\n", 392 id, perf[id].adds, perf[id].deletes, 393 perf[id].aborts, perf[id].txns, perf[id].aborted); 394 write(STDOUT_FILENO, buf, strlen(buf)); 395 } 396 perf[id].aborted = 0; 397 } 398 return (0); 399} 400 401/* 402 * stats -- 403 * Display reader/writer thread statistics. To display the statistics 404 * for the mpool trickle or deadlock threads, use db_stat(1). 405 */ 406void 407stats() 408{ 409 int id; 410 char *p, buf[8192]; 411 412 p = buf + sprintf(buf, "-------------\n"); 413 for (id = 0; id < nreaders + nwriters;) 414 if (id++ < nwriters) 415 p += sprintf(p, 416 "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d\n", 417 id, perf[id].adds, 418 perf[id].deletes, perf[id].aborts, perf[id].txns); 419 else 420 p += sprintf(p, 421 "reader: %2d: found: %5d: notfound: %5d: aborts: %4d\n", 422 id, perf[id].found, 423 perf[id].notfound, perf[id].aborts); 424 p += sprintf(p, "-------------\n"); 425 426 write(STDOUT_FILENO, buf, p - buf); 427} 428 429/* 430 * db_init -- 431 * Initialize the environment. 432 */ 433int 434db_init(home) 435 const char *home; 436{ 437 int ret; 438 439 if ((ret = db_env_create(&dbenv, 0)) != 0) { 440 fprintf(stderr, 441 "%s: db_env_create: %s\n", progname, db_strerror(ret)); 442 return (EXIT_FAILURE); 443 } 444 if (punish) 445 (void)dbenv->set_flags(dbenv, DB_YIELDCPU, 1); 446 447 dbenv->set_errfile(dbenv, stderr); 448 dbenv->set_errpfx(dbenv, progname); 449 (void)dbenv->set_cachesize(dbenv, 0, 100 * 1024, 0); 450 (void)dbenv->set_lg_max(dbenv, 200000); 451 452 if ((ret = dbenv->open(dbenv, home, 453 DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | 454 DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0)) != 0) { 455 dbenv->err(dbenv, ret, NULL); 456 (void)dbenv->close(dbenv, 0); 457 return (EXIT_FAILURE); 458 } 459 460 return (0); 461} 462 463/* 464 * tstart -- 465 * Thread start function for readers and writers. 466 */ 467void * 468tstart(arg) 469 void *arg; 470{ 471 pthread_t tid; 472 u_int id; 473 474 id = (uintptr_t)arg + 1; 475 476 tid = pthread_self(); 477 478 if (id <= (u_int)nwriters) { 479 printf("write thread %d starting: tid: %lu\n", id, (u_long)tid); 480 fflush(stdout); 481 writer(id); 482 } else { 483 printf("read thread %d starting: tid: %lu\n", id, (u_long)tid); 484 fflush(stdout); 485 reader(id); 486 } 487 488 /* NOTREACHED */ 489 return (NULL); 490} 491 492/* 493 * deadlock -- 494 * Thread start function for DB_ENV->lock_detect. 495 */ 496void * 497deadlock(arg) 498 void *arg; 499{ 500 struct timeval t; 501 pthread_t tid; 502 503 arg = arg; /* XXX: shut the compiler up. */ 504 tid = pthread_self(); 505 506 printf("deadlock thread starting: tid: %lu\n", (u_long)tid); 507 fflush(stdout); 508 509 t.tv_sec = 0; 510 t.tv_usec = 100000; 511 while (!quit) { 512 (void)dbenv->lock_detect(dbenv, 0, DB_LOCK_YOUNGEST, NULL); 513 514 /* Check every 100ms. */ 515 (void)select(0, NULL, NULL, NULL, &t); 516 } 517 518 return (NULL); 519} 520 521/* 522 * trickle -- 523 * Thread start function for memp_trickle. 524 */ 525void * 526trickle(arg) 527 void *arg; 528{ 529 pthread_t tid; 530 int wrote; 531 char buf[64]; 532 533 arg = arg; /* XXX: shut the compiler up. */ 534 tid = pthread_self(); 535 536 printf("trickle thread starting: tid: %lu\n", (u_long)tid); 537 fflush(stdout); 538 539 while (!quit) { 540 (void)dbenv->memp_trickle(dbenv, 10, &wrote); 541 if (verbose) { 542 sprintf(buf, "trickle: wrote %d\n", wrote); 543 write(STDOUT_FILENO, buf, strlen(buf)); 544 } 545 if (wrote == 0) { 546 sleep(1); 547 sched_yield(); 548 } 549 } 550 551 return (NULL); 552} 553 554/* 555 * word -- 556 * Build the dictionary word list. 557 */ 558void 559word() 560{ 561 FILE *fp; 562 int cnt; 563 char buf[256]; 564 565 if ((fp = fopen(WORDLIST, "r")) == NULL) 566 fatal(WORDLIST, errno, 1); 567 568 if ((list = malloc(nlist * sizeof(char *))) == NULL) 569 fatal(NULL, errno, 1); 570 571 for (cnt = 0; cnt < nlist; ++cnt) { 572 if (fgets(buf, sizeof(buf), fp) == NULL) 573 break; 574 if ((list[cnt] = strdup(buf)) == NULL) 575 fatal(NULL, errno, 1); 576 } 577 nlist = cnt; /* In case nlist was larger than possible. */ 578} 579 580/* 581 * fatal -- 582 * Report a fatal error and quit. 583 */ 584void 585fatal(msg, err, syserr) 586 const char *msg; 587 int err, syserr; 588{ 589 fprintf(stderr, "%s: ", progname); 590 if (msg != NULL) { 591 fprintf(stderr, "%s", msg); 592 if (syserr) 593 fprintf(stderr, ": "); 594 } 595 if (syserr) 596 fprintf(stderr, "%s", strerror(err)); 597 fprintf(stderr, "\n"); 598 exit(EXIT_FAILURE); 599 600 /* NOTREACHED */ 601} 602 603/* 604 * usage -- 605 * Usage message. 606 */ 607int 608usage() 609{ 610 (void)fprintf(stderr, 611 "usage: %s [-pv] [-h home] [-n words] [-r readers] [-w writers]\n", 612 progname); 613 return (EXIT_FAILURE); 614} 615 616/* 617 * onint -- 618 * Interrupt signal handler. 619 */ 620void 621onint(signo) 622 int signo; 623{ 624 signo = 0; /* Quiet compiler. */ 625 quit = 1; 626} 627