1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9/* File: txn_guide.c */ 10 11/* We assume an ANSI-compatible compiler */ 12#include <stdio.h> 13#include <stdlib.h> 14#include <string.h> 15#include <db.h> 16 17#ifdef _WIN32 18#include <windows.h> 19#define PATHD '\\' 20extern int getopt(int, char * const *, const char *); 21extern char *optarg; 22 23/* Wrap Windows thread API to make it look POSIXey. */ 24typedef HANDLE thread_t; 25#define thread_create(thrp, attr, func, arg) \ 26 (((*(thrp) = CreateThread(NULL, 0, \ 27 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) 28#define thread_join(thr, statusp) \ 29 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ 30 ((statusp == NULL) ? 0 : \ 31 (GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1))) 32 33typedef HANDLE mutex_t; 34#define mutex_init(m, attr) \ 35 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1) 36#define mutex_lock(m) \ 37 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1) 38#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1) 39#else 40#include <pthread.h> 41#include <unistd.h> 42#define PATHD '/' 43 44typedef pthread_t thread_t; 45#define thread_create(thrp, attr, func, arg) \ 46 pthread_create((thrp), (attr), (func), (arg)) 47#define thread_join(thr, statusp) pthread_join((thr), (statusp)) 48 49typedef pthread_mutex_t mutex_t; 50#define mutex_init(m, attr) pthread_mutex_init((m), (attr)) 51#define mutex_lock(m) pthread_mutex_lock(m) 52#define mutex_unlock(m) pthread_mutex_unlock(m) 53#endif 54 55/* Run 5 writers threads at a time. */ 56#define NUMWRITERS 5 57 58/* 59 * Printing of a thread_t is implementation-specific, so we 60 * create our own thread IDs for reporting purposes. 61 */ 62int global_thread_num; 63mutex_t thread_num_lock; 64 65/* Forward declarations */ 66int count_records(DB *, DB_TXN *); 67int open_db(DB **, const char *, const char *, DB_ENV *, u_int32_t); 68int usage(void); 69void *writer_thread(void *); 70 71/* Usage function */ 72int 73usage() 74{ 75 fprintf(stderr, " [-h <database_home_directory>]\n"); 76 return (EXIT_FAILURE); 77} 78 79int 80main(int argc, char *argv[]) 81{ 82 /* Initialize our handles */ 83 DB *dbp = NULL; 84 DB_ENV *envp = NULL; 85 86 thread_t writer_threads[NUMWRITERS]; 87 int ch, i, ret, ret_t; 88 u_int32_t env_flags; 89 char *db_home_dir; 90 /* Application name */ 91 const char *prog_name = "txn_guide"; 92 /* Database file name */ 93 const char *file_name = "mydb.db"; 94 95 /* Parse the command line arguments */ 96#ifdef _WIN32 97 db_home_dir = ".\\"; 98#else 99 db_home_dir = "./"; 100#endif 101 while ((ch = getopt(argc, argv, "h:")) != EOF) 102 switch (ch) { 103 case 'h': 104 db_home_dir = optarg; 105 break; 106 case '?': 107 default: 108 return (usage()); 109 } 110 111 /* Create the environment */ 112 ret = db_env_create(&envp, 0); 113 if (ret != 0) { 114 fprintf(stderr, "Error creating environment handle: %s\n", 115 db_strerror(ret)); 116 goto err; 117 } 118 119 /* 120 * Indicate that we want db to perform lock detection internally. 121 * Also indicate that the transaction with the fewest number of 122 * write locks will receive the deadlock notification in 123 * the event of a deadlock. 124 */ 125 ret = envp->set_lk_detect(envp, DB_LOCK_MINWRITE); 126 if (ret != 0) { 127 fprintf(stderr, "Error setting lock detect: %s\n", 128 db_strerror(ret)); 129 goto err; 130 } 131 132 env_flags = 133 DB_CREATE | /* Create the environment if it does not exist */ 134 DB_RECOVER | /* Run normal recovery. */ 135 DB_INIT_LOCK | /* Initialize the locking subsystem */ 136 DB_INIT_LOG | /* Initialize the logging subsystem */ 137 DB_INIT_TXN | /* Initialize the transactional subsystem. This 138 * also turns on logging. */ 139 DB_INIT_MPOOL | /* Initialize the memory pool (in-memory cache) */ 140 DB_THREAD; /* Cause the environment to be free-threaded */ 141 142 /* Now actually open the environment */ 143 ret = envp->open(envp, db_home_dir, env_flags, 0); 144 if (ret != 0) { 145 fprintf(stderr, "Error opening environment: %s\n", 146 db_strerror(ret)); 147 goto err; 148 } 149 150 /* 151 * If we had utility threads (for running checkpoints or 152 * deadlock detection, for example) we would spawn those 153 * here. However, for a simple example such as this, 154 * that is not required. 155 */ 156 157 /* Open the database */ 158 ret = open_db(&dbp, prog_name, file_name, 159 envp, DB_DUPSORT); 160 if (ret != 0) 161 goto err; 162 163 /* Initialize a mutex. Used to help provide thread ids. */ 164 (void)mutex_init(&thread_num_lock, NULL); 165 166 /* Start the writer threads. */ 167 for (i = 0; i < NUMWRITERS; i++) 168 (void)thread_create( 169 &writer_threads[i], NULL, writer_thread, (void *)dbp); 170 171 /* Join the writers */ 172 for (i = 0; i < NUMWRITERS; i++) 173 (void)thread_join(writer_threads[i], NULL); 174 175err: 176 /* Close our database handle, if it was opened. */ 177 if (dbp != NULL) { 178 ret_t = dbp->close(dbp, 0); 179 if (ret_t != 0) { 180 fprintf(stderr, "%s database close failed: %s\n", 181 file_name, db_strerror(ret_t)); 182 ret = ret_t; 183 } 184 } 185 186 /* Close our environment, if it was opened. */ 187 if (envp != NULL) { 188 ret_t = envp->close(envp, 0); 189 if (ret_t != 0) { 190 fprintf(stderr, "environment close failed: %s\n", 191 db_strerror(ret_t)); 192 ret = ret_t; 193 } 194 } 195 196 /* Final status message and return. */ 197 printf("I'm all done.\n"); 198 return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE); 199} 200 201/* 202 * A function that performs a series of writes to a 203 * Berkeley DB database. The information written 204 * to the database is largely nonsensical, but the 205 * mechanism of transactional commit/abort and 206 * deadlock detection is illustrated here. 207 */ 208void * 209writer_thread(void *args) 210{ 211 static char *key_strings[] = { 212 "key 1", "key 2", "key 3", "key 4", "key 5", 213 "key 6", "key 7", "key 8", "key 9", "key 10" 214 }; 215 DB *dbp; 216 DB_ENV *envp; 217 DBT key, value; 218 DB_TXN *txn; 219 int i, j, payload, ret, thread_num; 220 int retry_count, max_retries = 20; /* Max retry on a deadlock */ 221 222 dbp = (DB *)args; 223 envp = dbp->get_env(dbp); 224 225 /* Get the thread number */ 226 (void)mutex_lock(&thread_num_lock); 227 global_thread_num++; 228 thread_num = global_thread_num; 229 (void)mutex_unlock(&thread_num_lock); 230 231 /* Initialize the random number generator */ 232 srand(thread_num); 233 234 /* Write 50 times and then quit */ 235 for (i = 0; i < 50; i++) { 236 retry_count = 0; /* Used for deadlock retries */ 237 238 /* 239 * Some think it is bad form to loop with a goto statement, but 240 * we do it anyway because it is the simplest and clearest way 241 * to achieve our abort/retry operation. 242 */ 243retry: 244 /* Begin our transaction. We group multiple writes in 245 * this thread under a single transaction so as to 246 * (1) show that you can atomically perform multiple writes 247 * at a time, and (2) to increase the chances of a 248 * deadlock occurring so that we can observe our 249 * deadlock detection at work. 250 * 251 * Normally we would want to avoid the potential for deadlocks, 252 * so for this workload the correct thing would be to perform our 253 * puts with autocommit. But that would excessively simplify our 254 * example, so we do the "wrong" thing here instead. 255 */ 256 ret = envp->txn_begin(envp, NULL, &txn, 0); 257 if (ret != 0) { 258 envp->err(envp, ret, "txn_begin failed"); 259 return ((void *)EXIT_FAILURE); 260 } 261 for (j = 0; j < 10; j++) { 262 /* Set up our key and values DBTs */ 263 memset(&key, 0, sizeof(DBT)); 264 key.data = key_strings[j]; 265 key.size = (u_int32_t)strlen(key_strings[j]) + 1; 266 267 memset(&value, 0, sizeof(DBT)); 268 payload = rand() + i; 269 value.data = &payload; 270 value.size = sizeof(int); 271 272 /* Perform the database put. */ 273 switch (ret = dbp->put(dbp, txn, &key, &value, 0)) { 274 case 0: 275 break; 276 /* 277 * Our database is configured for sorted duplicates, 278 * so there is a potential for a KEYEXIST error return. 279 * If we get one, simply ignore it and continue on. 280 * 281 * Note that you will see KEYEXIST errors only after you 282 * have run this program at least once. 283 */ 284 case DB_KEYEXIST: 285 printf("Got keyexists.\n"); 286 break; 287 /* 288 * Here's where we perform deadlock detection. If 289 * DB_LOCK_DEADLOCK is returned by the put operation, 290 * then this thread has been chosen to break a deadlock. 291 * It must abort its operation, and optionally retry the 292 * put. 293 */ 294 case DB_LOCK_DEADLOCK: 295 /* 296 * First thing that we MUST do is abort the 297 * transaction. 298 */ 299 (void)txn->abort(txn); 300 /* 301 * Now we decide if we want to retry the operation. 302 * If we have retried less than max_retries, 303 * increment the retry count and goto retry. 304 */ 305 if (retry_count < max_retries) { 306 printf("Writer %i: Got DB_LOCK_DEADLOCK.\n", 307 thread_num); 308 printf("Writer %i: Retrying write operation.\n", 309 thread_num); 310 retry_count++; 311 goto retry; 312 } 313 /* 314 * Otherwise, just give up. 315 */ 316 printf("Writer %i: ", thread_num); 317 printf("Got DB_LOCK_DEADLOCK and out of retries.\n"); 318 printf("Writer %i: Giving up.\n", thread_num); 319 return ((void *)EXIT_FAILURE); 320 /* 321 * If a generic error occurs, we simply abort the 322 * transaction and exit the thread completely. 323 */ 324 default: 325 envp->err(envp, ret, "db put failed"); 326 ret = txn->abort(txn); 327 if (ret != 0) 328 envp->err(envp, ret, 329 "txn abort failed"); 330 return ((void *)EXIT_FAILURE); 331 } /** End case statement **/ 332 333 } /** End for loop **/ 334 335 /* 336 * print the number of records found in the database. 337 * See count_records() for usage information. 338 */ 339 printf("Thread %i. Record count: %i\n", thread_num, 340 count_records(dbp, NULL)); 341 342 /* 343 * If all goes well, we can commit the transaction and 344 * exit the thread. 345 */ 346 ret = txn->commit(txn, 0); 347 if (ret != 0) { 348 envp->err(envp, ret, "txn commit failed"); 349 return ((void *)EXIT_FAILURE); 350 } 351 } 352 return ((void *)EXIT_SUCCESS); 353} 354 355/* 356 * This simply counts the number of records contained in the 357 * database and returns the result. You can use this function 358 * in three ways: 359 * 360 * First call it with an active txn handle. 361 * Secondly, configure the cursor for uncommitted reads (this 362 * is what the example currently does). 363 * Third, call count_records AFTER the writer has committed 364 * its transaction. 365 * 366 * If you do none of these things, the writer thread will 367 * self-deadlock. 368 * 369 * Note that this function exists only for illustrative purposes. 370 * A more straight-forward way to count the number of records in 371 * a database is to use DB->stat() or DB->stat_print(). 372 */ 373 374int 375count_records(DB *dbp, DB_TXN *txn) 376{ 377 378 DBT key, value; 379 DBC *cursorp; 380 int count, ret; 381 382 cursorp = NULL; 383 count = 0; 384 385 /* Get the cursor */ 386 ret = dbp->cursor(dbp, txn, &cursorp, 387 DB_READ_UNCOMMITTED); 388 if (ret != 0) { 389 dbp->err(dbp, ret, 390 "count_records: cursor open failed."); 391 goto cursor_err; 392 } 393 394 /* Get the key DBT used for the database read */ 395 memset(&key, 0, sizeof(DBT)); 396 memset(&value, 0, sizeof(DBT)); 397 do { 398 ret = cursorp->get(cursorp, &key, &value, DB_NEXT); 399 switch (ret) { 400 case 0: 401 count++; 402 break; 403 case DB_NOTFOUND: 404 break; 405 default: 406 dbp->err(dbp, ret, 407 "Count records unspecified error"); 408 goto cursor_err; 409 } 410 } while (ret == 0); 411 412cursor_err: 413 if (cursorp != NULL) { 414 ret = cursorp->close(cursorp); 415 if (ret != 0) { 416 dbp->err(dbp, ret, 417 "count_records: cursor close failed."); 418 } 419 } 420 421 return (count); 422} 423 424/* Open a Berkeley DB database */ 425int 426open_db(DB **dbpp, const char *progname, const char *file_name, 427 DB_ENV *envp, u_int32_t extra_flags) 428{ 429 int ret; 430 u_int32_t open_flags; 431 DB *dbp; 432 433 /* Initialize the DB handle */ 434 ret = db_create(&dbp, envp, 0); 435 if (ret != 0) { 436 fprintf(stderr, "%s: %s\n", progname, 437 db_strerror(ret)); 438 return (EXIT_FAILURE); 439 } 440 441 /* Point to the memory malloc'd by db_create() */ 442 *dbpp = dbp; 443 444 if (extra_flags != 0) { 445 ret = dbp->set_flags(dbp, extra_flags); 446 if (ret != 0) { 447 dbp->err(dbp, ret, 448 "open_db: Attempt to set extra flags failed."); 449 return (EXIT_FAILURE); 450 } 451 } 452 453 /* Now open the database */ 454 open_flags = DB_CREATE | /* Allow database creation */ 455 DB_READ_UNCOMMITTED | /* Allow dirty reads */ 456 DB_AUTO_COMMIT; /* Allow autocommit */ 457 458 ret = dbp->open(dbp, /* Pointer to the database */ 459 NULL, /* Txn pointer */ 460 file_name, /* File name */ 461 NULL, /* Logical db name */ 462 DB_BTREE, /* Database type (using btree) */ 463 open_flags, /* Open flags */ 464 0); /* File mode. Using defaults */ 465 if (ret != 0) { 466 dbp->err(dbp, ret, "Database '%s' open failed", 467 file_name); 468 return (EXIT_FAILURE); 469 } 470 return (EXIT_SUCCESS); 471} 472