1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9/* File: txn_guide_inmemory.c */ 10 11/* We assume an ANSI-compatible compiler */ 12#include <stdio.h> 13#include <stdlib.h> 14#include <string.h> 15#include <db.h> 16 17#ifdef _WIN32 18#include <windows.h> 19#define PATHD '\\' 20extern int getopt(int, char * const *, const char *); 21extern char *optarg; 22 23/* Wrap Windows thread API to make it look POSIXey. */ 24typedef HANDLE thread_t; 25#define thread_create(thrp, attr, func, arg) \ 26 (((*(thrp) = CreateThread(NULL, 0, \ 27 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) 28#define thread_join(thr, statusp) \ 29 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ 30 ((statusp == NULL) ? 0 : \ 31 (GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1))) 32 33typedef HANDLE mutex_t; 34#define mutex_init(m, attr) \ 35 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1) 36#define mutex_lock(m) \ 37 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1) 38#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1) 39#else 40#include <pthread.h> 41#include <unistd.h> 42#define PATHD '/' 43 44typedef pthread_t thread_t; 45#define thread_create(thrp, attr, func, arg) \ 46 pthread_create((thrp), (attr), (func), (arg)) 47#define thread_join(thr, statusp) pthread_join((thr), (statusp)) 48 49typedef pthread_mutex_t mutex_t; 50#define mutex_init(m, attr) pthread_mutex_init((m), (attr)) 51#define mutex_lock(m) pthread_mutex_lock(m) 52#define mutex_unlock(m) pthread_mutex_unlock(m) 53#endif 54 55/* Run 5 writers threads at a time. */ 56#define NUMWRITERS 5 57 58/* 59 * Printing of a thread_t is implementation-specific, so we 60 * create our own thread IDs for reporting purposes. 61 */ 62int global_thread_num; 63mutex_t thread_num_lock; 64 65/* Forward declarations */ 66int count_records(DB *, DB_TXN *); 67int open_db(DB **, const char *, const char *, DB_ENV *, u_int32_t); 68void *writer_thread(void *); 69 70int 71main(void) 72{ 73 /* Initialize our handles */ 74 DB *dbp = NULL; 75 DB_ENV *envp = NULL; 76 77 thread_t writer_threads[NUMWRITERS]; 78 int i, ret, ret_t; 79 u_int32_t env_flags; 80 81 /* Application name */ 82 const char *prog_name = "txn_guide_inmemory"; 83 84 /* Create the environment */ 85 ret = db_env_create(&envp, 0); 86 if (ret != 0) { 87 fprintf(stderr, "Error creating environment handle: %s\n", 88 db_strerror(ret)); 89 goto err; 90 } 91 92 env_flags = 93 DB_CREATE | /* Create the environment if it does not exist */ 94 DB_INIT_LOCK | /* Initialize the locking subsystem */ 95 DB_INIT_LOG | /* Initialize the logging subsystem */ 96 DB_INIT_TXN | /* Initialize the transactional subsystem. This 97 * also turns on logging. */ 98 DB_INIT_MPOOL | /* Initialize the memory pool (in-memory cache) */ 99 DB_PRIVATE | /* Region files are backed by heap memory. */ 100 DB_THREAD; /* Cause the environment to be free-threaded */ 101 102 /* Specify in-memory logging */ 103 ret = envp->log_set_config(envp, DB_LOG_IN_MEMORY, 1); 104 if (ret != 0) { 105 fprintf(stderr, "Error setting log subsystem to in-memory: %s\n", 106 db_strerror(ret)); 107 goto err; 108 } 109 110 /* 111 * Specify the size of the in-memory log buffer. 112 */ 113 ret = envp->set_lg_bsize(envp, 10 * 1024 * 1024); 114 if (ret != 0) { 115 fprintf(stderr, "Error increasing the log buffer size: %s\n", 116 db_strerror(ret)); 117 goto err; 118 } 119 120 /* 121 * Specify the size of the in-memory cache. 122 */ 123 ret = envp->set_cachesize(envp, 0, 10 * 1024 * 1024, 1); 124 if (ret != 0) { 125 fprintf(stderr, "Error increasing the cache size: %s\n", 126 db_strerror(ret)); 127 goto err; 128 } 129 130 /* 131 * Indicate that we want db to perform lock detection internally. 132 * Also indicate that the transaction with the fewest number of 133 * write locks will receive the deadlock notification in 134 * the event of a deadlock. 135 */ 136 ret = envp->set_lk_detect(envp, DB_LOCK_MINWRITE); 137 if (ret != 0) { 138 fprintf(stderr, "Error setting lock detect: %s\n", 139 db_strerror(ret)); 140 goto err; 141 } 142 143 /* Now actually open the environment */ 144 ret = envp->open(envp, NULL, env_flags, 0); 145 if (ret != 0) { 146 fprintf(stderr, "Error opening environment: %s\n", 147 db_strerror(ret)); 148 goto err; 149 } 150 151 /* 152 * If we had utility threads (for running checkpoints or 153 * deadlock detection, for example) we would spawn those 154 * here. However, for a simple example such as this, 155 * that is not required. 156 */ 157 158 /* Open the database */ 159 ret = open_db(&dbp, prog_name, NULL, 160 envp, DB_DUPSORT); 161 if (ret != 0) 162 goto err; 163 164 /* Initialize a mutex. Used to help provide thread ids. */ 165 (void)mutex_init(&thread_num_lock, NULL); 166 167 /* Start the writer threads. */ 168 for (i = 0; i < NUMWRITERS; i++) 169 (void)thread_create( 170 &writer_threads[i], NULL, writer_thread, (void *)dbp); 171 172 /* Join the writers */ 173 for (i = 0; i < NUMWRITERS; i++) 174 (void)thread_join(writer_threads[i], NULL); 175 176err: 177 /* Close our database handle, if it was opened. */ 178 if (dbp != NULL) { 179 ret_t = dbp->close(dbp, 0); 180 if (ret_t != 0) { 181 fprintf(stderr, "%s database close failed.\n", 182 db_strerror(ret_t)); 183 ret = ret_t; 184 } 185 } 186 187 /* Close our environment, if it was opened. */ 188 if (envp != NULL) { 189 ret_t = envp->close(envp, 0); 190 if (ret_t != 0) { 191 fprintf(stderr, "environment close failed: %s\n", 192 db_strerror(ret_t)); 193 ret = ret_t; 194 } 195 } 196 197 /* Final status message and return. */ 198 printf("I'm all done.\n"); 199 return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE); 200} 201 202/* 203 * A function that performs a series of writes to a 204 * Berkeley DB database. The information written 205 * to the database is largely nonsensical, but the 206 * mechanism of transactional commit/abort and 207 * deadlock detection is illustrated here. 208 */ 209void * 210writer_thread(void *args) 211{ 212 static char *key_strings[] = { 213 "key 1", "key 2", "key 3", "key 4", "key 5", 214 "key 6", "key 7", "key 8", "key 9", "key 10" 215 }; 216 DB *dbp; 217 DB_ENV *envp; 218 DBT key, value; 219 DB_TXN *txn; 220 int i, j, payload, ret, thread_num; 221 int retry_count, max_retries = 20; /* Max retry on a deadlock */ 222 223 dbp = (DB *)args; 224 envp = dbp->get_env(dbp); 225 226 /* Get the thread number */ 227 (void)mutex_lock(&thread_num_lock); 228 global_thread_num++; 229 thread_num = global_thread_num; 230 (void)mutex_unlock(&thread_num_lock); 231 232 /* Initialize the random number generator */ 233 srand(thread_num); 234 235 /* Write 50 times and then quit */ 236 for (i = 0; i < 50; i++) { 237 retry_count = 0; /* Used for deadlock retries */ 238 239 /* 240 * Some think it is bad form to loop with a goto statement, but 241 * we do it anyway because it is the simplest and clearest way 242 * to achieve our abort/retry operation. 243 */ 244retry: 245 /* Begin our transaction. We group multiple writes in 246 * this thread under a single transaction so as to 247 * (1) show that you can atomically perform multiple writes 248 * at a time, and (2) to increase the chances of a 249 * deadlock occurring so that we can observe our 250 * deadlock detection at work. 251 * 252 * Normally we would want to avoid the potential for deadlocks, 253 * so for this workload the correct thing would be to perform our 254 * puts with autocommit. But that would excessively simplify our 255 * example, so we do the "wrong" thing here instead. 256 */ 257 ret = envp->txn_begin(envp, NULL, &txn, 0); 258 if (ret != 0) { 259 envp->err(envp, ret, "txn_begin failed"); 260 return ((void *)EXIT_FAILURE); 261 } 262 for (j = 0; j < 10; j++) { 263 /* Set up our key and values DBTs */ 264 memset(&key, 0, sizeof(DBT)); 265 key.data = key_strings[j]; 266 key.size = (u_int32_t)strlen(key_strings[j]) + 1; 267 268 memset(&value, 0, sizeof(DBT)); 269 payload = rand() + i; 270 value.data = &payload; 271 value.size = sizeof(int); 272 273 /* Perform the database put. */ 274 switch (ret = dbp->put(dbp, txn, &key, &value, 0)) { 275 case 0: 276 break; 277 278 /* 279 * Here's where we perform deadlock detection. If 280 * DB_LOCK_DEADLOCK is returned by the put operation, 281 * then this thread has been chosen to break a deadlock. 282 * It must abort its operation, and optionally retry the 283 * put. 284 */ 285 case DB_LOCK_DEADLOCK: 286 /* 287 * First thing that we MUST do is abort the 288 * transaction. 289 */ 290 (void)txn->abort(txn); 291 /* 292 * Now we decide if we want to retry the operation. 293 * If we have retried less than max_retries, 294 * increment the retry count and goto retry. 295 */ 296 if (retry_count < max_retries) { 297 printf("Writer %i: Got DB_LOCK_DEADLOCK.\n", 298 thread_num); 299 printf("Writer %i: Retrying write operation.\n", 300 thread_num); 301 retry_count++; 302 goto retry; 303 } 304 /* 305 * Otherwise, just give up. 306 */ 307 printf("Writer %i: ", thread_num); 308 printf("Got DB_LOCK_DEADLOCK and out of retries.\n"); 309 printf("Writer %i: Giving up.\n", thread_num); 310 return ((void *)EXIT_FAILURE); 311 /* 312 * If a generic error occurs, we simply abort the 313 * transaction and exit the thread completely. 314 */ 315 default: 316 envp->err(envp, ret, "db put failed"); 317 ret = txn->abort(txn); 318 if (ret != 0) 319 envp->err(envp, ret, "txn abort failed"); 320 return ((void *)EXIT_FAILURE); 321 } /** End case statement **/ 322 323 } /** End for loop **/ 324 325 /* 326 * print the number of records found in the database. 327 * See count_records() for usage information. 328 */ 329 printf("Thread %i. Record count: %i\n", thread_num, 330 count_records(dbp, txn)); 331 332 /* 333 * If all goes well, we can commit the transaction and 334 * exit the thread. 335 */ 336 ret = txn->commit(txn, 0); 337 if (ret != 0) { 338 envp->err(envp, ret, "txn commit failed"); 339 return ((void *)EXIT_FAILURE); 340 } 341 } 342 return ((void *)EXIT_SUCCESS); 343} 344 345/* 346 * This simply counts the number of records contained in the 347 * database and returns the result. You can use this function 348 * in three ways: 349 * 350 * First call it with an active txn handle (this is what the 351 * example currently does). 352 * 353 * Secondly, configure the cursor for uncommitted reads. 354 * 355 * Third, call count_records AFTER the writer has committed 356 * its transaction. 357 * 358 * If you do none of these things, the writer thread will 359 * self-deadlock. 360 * 361 * Note that this function exists only for illustrative purposes. 362 * A more straight-forward way to count the number of records in 363 * a database is to use DB->stat() or DB->stat_print(). 364 */ 365 366int 367count_records(DB *dbp, DB_TXN *txn) 368{ 369 DBT key, value; 370 DBC *cursorp; 371 int count, ret; 372 373 cursorp = NULL; 374 count = 0; 375 376 /* Get the cursor */ 377 ret = dbp->cursor(dbp, txn, &cursorp, 0); 378 if (ret != 0) { 379 dbp->err(dbp, ret, 380 "count_records: cursor open failed."); 381 goto cursor_err; 382 } 383 384 /* Get the key DBT used for the database read */ 385 memset(&key, 0, sizeof(DBT)); 386 memset(&value, 0, sizeof(DBT)); 387 do { 388 ret = cursorp->get(cursorp, &key, &value, DB_NEXT); 389 switch (ret) { 390 case 0: 391 count++; 392 break; 393 case DB_NOTFOUND: 394 break; 395 default: 396 dbp->err(dbp, ret, 397 "Count records unspecified error"); 398 goto cursor_err; 399 } 400 } while (ret == 0); 401 402cursor_err: 403 if (cursorp != NULL) { 404 ret = cursorp->close(cursorp); 405 if (ret != 0) { 406 dbp->err(dbp, ret, 407 "count_records: cursor close failed."); 408 } 409 } 410 411 return (count); 412} 413 414/* Open a Berkeley DB database */ 415int 416open_db(DB **dbpp, const char *progname, const char *file_name, 417 DB_ENV *envp, u_int32_t extra_flags) 418{ 419 int ret; 420 u_int32_t open_flags; 421 DB *dbp; 422 423 /* Initialize the DB handle */ 424 ret = db_create(&dbp, envp, 0); 425 if (ret != 0) { 426 fprintf(stderr, "%s: %s\n", progname, 427 db_strerror(ret)); 428 return (EXIT_FAILURE); 429 } 430 431 /* Point to the memory malloc'd by db_create() */ 432 *dbpp = dbp; 433 434 if (extra_flags != 0) { 435 ret = dbp->set_flags(dbp, extra_flags); 436 if (ret != 0) { 437 dbp->err(dbp, ret, 438 "open_db: Attempt to set extra flags failed."); 439 return (EXIT_FAILURE); 440 } 441 } 442 443 /* Now open the database */ 444 open_flags = DB_CREATE | /* Allow database creation */ 445 DB_THREAD | 446 DB_AUTO_COMMIT; /* Allow autocommit */ 447 448 ret = dbp->open(dbp, /* Pointer to the database */ 449 NULL, /* Txn pointer */ 450 file_name, /* File name */ 451 NULL, /* Logical db name */ 452 DB_BTREE, /* Database type (using btree) */ 453 open_flags, /* Open flags */ 454 0); /* File mode. Using defaults */ 455 456 if (ret != 0) { 457 dbp->err(dbp, ret, "Database open failed"); 458 return (EXIT_FAILURE); 459 } 460 return (EXIT_SUCCESS); 461} 462