1/* File: txn_guide_inmemory.c */ 2 3/* We assume an ANSI-compatible compiler */ 4#include <stdio.h> 5#include <stdlib.h> 6#include <string.h> 7#include <db.h> 8 9#ifdef _WIN32 10#include <windows.h> 11#define PATHD '\\' 12extern int getopt(int, char * const *, const char *); 13extern char *optarg; 14 15typedef HANDLE thread_t; 16#define thread_create(thrp, attr, func, arg) \ 17 (((*(thrp) = CreateThread(NULL, 0, \ 18 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) 19#define thread_join(thr, statusp) \ 20 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ 21 GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1) 22 23typedef HANDLE mutex_t; 24#define mutex_init(m, attr) \ 25 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1) 26#define mutex_lock(m) \ 27 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1) 28#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1) 29#else 30#include <pthread.h> 31#include <unistd.h> 32#define PATHD '/' 33 34typedef pthread_t thread_t; 35#define thread_create(thrp, attr, func, arg) \ 36 pthread_create((thrp), (attr), (func), (arg)) 37#define thread_join(thr, statusp) pthread_join((thr), (statusp)) 38 39typedef pthread_mutex_t mutex_t; 40#define mutex_init(m, attr) pthread_mutex_init((m), (attr)) 41#define mutex_lock(m) pthread_mutex_lock(m) 42#define mutex_unlock(m) pthread_mutex_unlock(m) 43#endif 44 45/* Run 5 writers threads at a time. */ 46#define NUMWRITERS 5 47 48/* 49 * Printing of a thread_t is implementation-specific, so we 50 * create our own thread IDs for reporting purposes. 51 */ 52int global_thread_num; 53mutex_t thread_num_lock; 54 55/* Forward declarations */ 56int count_records(DB *, DB_TXN *); 57int open_db(DB **, const char *, const char *, DB_ENV *, u_int32_t); 58void *writer_thread(void *); 59 60int 61main(void) 62{ 63 /* Initialize our handles */ 64 DB *dbp = NULL; 65 DB_ENV *envp = NULL; 66 67 thread_t writer_threads[NUMWRITERS]; 68 int i, ret, ret_t; 69 u_int32_t env_flags; 70 71 /* Application name */ 72 const char *prog_name = "txn_guide_inmemory"; 73 74 /* Create the environment */ 75 ret = db_env_create(&envp, 0); 76 if (ret != 0) { 77 fprintf(stderr, "Error creating environment handle: %s\n", 78 db_strerror(ret)); 79 goto err; 80 } 81 82 env_flags = 83 DB_CREATE | /* Create the environment if it does not exist */ 84 DB_INIT_LOCK | /* Initialize the locking subsystem */ 85 DB_INIT_LOG | /* Initialize the logging subsystem */ 86 DB_INIT_TXN | /* Initialize the transactional subsystem. This 87 * also turns on logging. */ 88 DB_INIT_MPOOL | /* Initialize the memory pool (in-memory cache) */ 89 DB_PRIVATE | /* Region files are backed by heap memory. */ 90 DB_THREAD; /* Cause the environment to be free-threaded */ 91 92 /* Specify in-memory logging */ 93 ret = envp->log_set_config(envp, DB_LOG_IN_MEMORY, 1); 94 if (ret != 0) { 95 fprintf(stderr, "Error setting log subsystem to in-memory: %s\n", 96 db_strerror(ret)); 97 goto err; 98 } 99 100 /* 101 * Specify the size of the in-memory log buffer. 102 */ 103 ret = envp->set_lg_bsize(envp, 10 * 1024 * 1024); 104 if (ret != 0) { 105 fprintf(stderr, "Error increasing the log buffer size: %s\n", 106 db_strerror(ret)); 107 goto err; 108 } 109 110 /* 111 * Specify the size of the in-memory cache. 112 */ 113 ret = envp->set_cachesize(envp, 0, 10 * 1024 * 1024, 1); 114 if (ret != 0) { 115 fprintf(stderr, "Error increasing the cache size: %s\n", 116 db_strerror(ret)); 117 goto err; 118 } 119 120 /* 121 * Indicate that we want db to perform lock detection internally. 122 * Also indicate that the transaction with the fewest number of 123 * write locks will receive the deadlock notification in 124 * the event of a deadlock. 125 */ 126 ret = envp->set_lk_detect(envp, DB_LOCK_MINWRITE); 127 if (ret != 0) { 128 fprintf(stderr, "Error setting lock detect: %s\n", 129 db_strerror(ret)); 130 goto err; 131 } 132 133 /* Now actually open the environment */ 134 ret = envp->open(envp, NULL, env_flags, 0); 135 if (ret != 0) { 136 fprintf(stderr, "Error opening environment: %s\n", 137 db_strerror(ret)); 138 goto err; 139 } 140 141 /* 142 * If we had utility threads (for running checkpoints or 143 * deadlock detection, for example) we would spawn those 144 * here. However, for a simple example such as this, 145 * that is not required. 146 */ 147 148 /* Open the database */ 149 ret = open_db(&dbp, prog_name, NULL, 150 envp, DB_DUPSORT); 151 if (ret != 0) 152 goto err; 153 154 /* Initialize a mutex. Used to help provide thread ids. */ 155 (void)mutex_init(&thread_num_lock, NULL); 156 157 /* Start the writer threads. */ 158 for (i = 0; i < NUMWRITERS; i++) 159 (void)thread_create( 160 &writer_threads[i], NULL, writer_thread, (void *)dbp); 161 162 /* Join the writers */ 163 for (i = 0; i < NUMWRITERS; i++) 164 (void)thread_join(writer_threads[i], NULL); 165 166err: 167 /* Close our database handle, if it was opened. */ 168 if (dbp != NULL) { 169 ret_t = dbp->close(dbp, 0); 170 if (ret_t != 0) { 171 fprintf(stderr, "%s database close failed.\n", 172 db_strerror(ret_t)); 173 ret = ret_t; 174 } 175 } 176 177 /* Close our environment, if it was opened. */ 178 if (envp != NULL) { 179 ret_t = envp->close(envp, 0); 180 if (ret_t != 0) { 181 fprintf(stderr, "environment close failed: %s\n", 182 db_strerror(ret_t)); 183 ret = ret_t; 184 } 185 } 186 187 /* Final status message and return. */ 188 printf("I'm all done.\n"); 189 return (ret == 0 ? EXIT_SUCCESS : EXIT_FAILURE); 190} 191 192/* 193 * A function that performs a series of writes to a 194 * Berkeley DB database. The information written 195 * to the database is largely nonsensical, but the 196 * mechanism of transactional commit/abort and 197 * deadlock detection is illustrated here. 198 */ 199void * 200writer_thread(void *args) 201{ 202 static char *key_strings[] = { 203 "key 1", "key 2", "key 3", "key 4", "key 5", 204 "key 6", "key 7", "key 8", "key 9", "key 10" 205 }; 206 DB *dbp; 207 DB_ENV *envp; 208 DBT key, value; 209 DB_TXN *txn; 210 int i, j, payload, ret, thread_num; 211 int retry_count, max_retries = 20; /* Max retry on a deadlock */ 212 213 dbp = (DB *)args; 214 envp = dbp->get_env(dbp); 215 216 /* Get the thread number */ 217 (void)mutex_lock(&thread_num_lock); 218 global_thread_num++; 219 thread_num = global_thread_num; 220 (void)mutex_unlock(&thread_num_lock); 221 222 /* Initialize the random number generator */ 223 srand(thread_num); 224 225 /* Write 50 times and then quit */ 226 for (i = 0; i < 50; i++) { 227 retry_count = 0; /* Used for deadlock retries */ 228 229 /* 230 * Some think it is bad form to loop with a goto statement, but 231 * we do it anyway because it is the simplest and clearest way 232 * to achieve our abort/retry operation. 233 */ 234retry: 235 /* Begin our transaction. We group multiple writes in 236 * this thread under a single transaction so as to 237 * (1) show that you can atomically perform multiple writes 238 * at a time, and (2) to increase the chances of a 239 * deadlock occurring so that we can observe our 240 * deadlock detection at work. 241 * 242 * Normally we would want to avoid the potential for deadlocks, 243 * so for this workload the correct thing would be to perform our 244 * puts with autocommit. But that would excessively simplify our 245 * example, so we do the "wrong" thing here instead. 246 */ 247 ret = envp->txn_begin(envp, NULL, &txn, 0); 248 if (ret != 0) { 249 envp->err(envp, ret, "txn_begin failed"); 250 return ((void *)EXIT_FAILURE); 251 } 252 for (j = 0; j < 10; j++) { 253 /* Set up our key and values DBTs */ 254 memset(&key, 0, sizeof(DBT)); 255 key.data = key_strings[j]; 256 key.size = (u_int32_t)strlen(key_strings[j]) + 1; 257 258 memset(&value, 0, sizeof(DBT)); 259 payload = rand() + i; 260 value.data = &payload; 261 value.size = sizeof(int); 262 263 /* Perform the database put. */ 264 switch (ret = dbp->put(dbp, txn, &key, &value, 0)) { 265 case 0: 266 break; 267 268 /* 269 * Here's where we perform deadlock detection. If 270 * DB_LOCK_DEADLOCK is returned by the put operation, 271 * then this thread has been chosen to break a deadlock. 272 * It must abort its operation, and optionally retry the 273 * put. 274 */ 275 case DB_LOCK_DEADLOCK: 276 /* 277 * First thing that we MUST do is abort the 278 * transaction. 279 */ 280 (void)txn->abort(txn); 281 /* 282 * Now we decide if we want to retry the operation. 283 * If we have retried less than max_retries, 284 * increment the retry count and goto retry. 285 */ 286 if (retry_count < max_retries) { 287 printf("Writer %i: Got DB_LOCK_DEADLOCK.\n", 288 thread_num); 289 printf("Writer %i: Retrying write operation.\n", 290 thread_num); 291 retry_count++; 292 goto retry; 293 } 294 /* 295 * Otherwise, just give up. 296 */ 297 printf("Writer %i: ", thread_num); 298 printf("Got DB_LOCK_DEADLOCK and out of retries.\n"); 299 printf("Writer %i: Giving up.\n", thread_num); 300 return ((void *)EXIT_FAILURE); 301 /* 302 * If a generic error occurs, we simply abort the 303 * transaction and exit the thread completely. 304 */ 305 default: 306 envp->err(envp, ret, "db put failed"); 307 ret = txn->abort(txn); 308 if (ret != 0) 309 envp->err(envp, ret, "txn abort failed"); 310 return ((void *)EXIT_FAILURE); 311 } /** End case statement **/ 312 313 } /** End for loop **/ 314 315 /* 316 * print the number of records found in the database. 317 * See count_records() for usage information. 318 */ 319 printf("Thread %i. Record count: %i\n", thread_num, 320 count_records(dbp, txn)); 321 322 /* 323 * If all goes well, we can commit the transaction and 324 * exit the thread. 325 */ 326 ret = txn->commit(txn, 0); 327 if (ret != 0) { 328 envp->err(envp, ret, "txn commit failed"); 329 return ((void *)EXIT_FAILURE); 330 } 331 } 332 return ((void *)EXIT_SUCCESS); 333} 334 335/* 336 * This simply counts the number of records contained in the 337 * database and returns the result. You can use this function 338 * in three ways: 339 * 340 * First call it with an active txn handle (this is what the 341 * example currently does). 342 * 343 * Secondly, configure the cursor for uncommitted reads. 344 * 345 * Third, call count_records AFTER the writer has committed 346 * its transaction. 347 * 348 * If you do none of these things, the writer thread will 349 * self-deadlock. 350 * 351 * Note that this function exists only for illustrative purposes. 352 * A more straight-forward way to count the number of records in 353 * a database is to use DB->stat() or DB->stat_print(). 354 */ 355 356int 357count_records(DB *dbp, DB_TXN *txn) 358{ 359 DBT key, value; 360 DBC *cursorp; 361 int count, ret; 362 363 cursorp = NULL; 364 count = 0; 365 366 /* Get the cursor */ 367 ret = dbp->cursor(dbp, txn, &cursorp, 0); 368 if (ret != 0) { 369 dbp->err(dbp, ret, 370 "count_records: cursor open failed."); 371 goto cursor_err; 372 } 373 374 /* Get the key DBT used for the database read */ 375 memset(&key, 0, sizeof(DBT)); 376 memset(&value, 0, sizeof(DBT)); 377 do { 378 ret = cursorp->get(cursorp, &key, &value, DB_NEXT); 379 switch (ret) { 380 case 0: 381 count++; 382 break; 383 case DB_NOTFOUND: 384 break; 385 default: 386 dbp->err(dbp, ret, 387 "Count records unspecified error"); 388 goto cursor_err; 389 } 390 } while (ret == 0); 391 392cursor_err: 393 if (cursorp != NULL) { 394 ret = cursorp->close(cursorp); 395 if (ret != 0) { 396 dbp->err(dbp, ret, 397 "count_records: cursor close failed."); 398 } 399 } 400 401 return (count); 402} 403 404/* Open a Berkeley DB database */ 405int 406open_db(DB **dbpp, const char *progname, const char *file_name, 407 DB_ENV *envp, u_int32_t extra_flags) 408{ 409 int ret; 410 u_int32_t open_flags; 411 DB *dbp; 412 413 /* Initialize the DB handle */ 414 ret = db_create(&dbp, envp, 0); 415 if (ret != 0) { 416 fprintf(stderr, "%s: %s\n", progname, 417 db_strerror(ret)); 418 return (EXIT_FAILURE); 419 } 420 421 /* Point to the memory malloc'd by db_create() */ 422 *dbpp = dbp; 423 424 if (extra_flags != 0) { 425 ret = dbp->set_flags(dbp, extra_flags); 426 if (ret != 0) { 427 dbp->err(dbp, ret, 428 "open_db: Attempt to set extra flags failed."); 429 return (EXIT_FAILURE); 430 } 431 } 432 433 /* Now open the database */ 434 open_flags = DB_CREATE | /* Allow database creation */ 435 DB_THREAD | 436 DB_AUTO_COMMIT; /* Allow autocommit */ 437 438 ret = dbp->open(dbp, /* Pointer to the database */ 439 NULL, /* Txn pointer */ 440 file_name, /* File name */ 441 NULL, /* Logical db name */ 442 DB_BTREE, /* Database type (using btree) */ 443 open_flags, /* Open flags */ 444 0); /* File mode. Using defaults */ 445 446 if (ret != 0) { 447 dbp->err(dbp, ret, "Database open failed"); 448 return (EXIT_FAILURE); 449 } 450 return (EXIT_SUCCESS); 451} 452