1<?xml version="1.0" encoding="UTF-8" standalone="no"?> 2<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> 3<html xmlns="http://www.w3.org/1999/xhtml"> 4 <head> 5 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" /> 6 <title>Transaction Example</title> 7 <link rel="stylesheet" href="gettingStarted.css" type="text/css" /> 8 <meta name="generator" content="DocBook XSL Stylesheets V1.73.2" /> 9 <link rel="start" href="index.html" title="Getting Started with Berkeley DB Transaction Processing" /> 10 <link rel="up" href="wrapup.html" title="Chapter 6. Summary and Examples" /> 11 <link rel="prev" href="wrapup.html" title="Chapter 6. Summary and Examples" /> 12 <link rel="next" href="inmem_txnexample_c.html" title="In-Memory Transaction Example" /> 13 </head> 14 <body> 15 <div class="navheader"> 16 <table width="100%" summary="Navigation header"> 17 <tr> 18 <th colspan="3" align="center">Transaction Example</th> 19 </tr> 20 <tr> 21 <td width="20%" align="left"><a accesskey="p" href="wrapup.html">Prev</a> </td> 22 <th width="60%" align="center">Chapter 6. Summary and Examples</th> 23 <td width="20%" align="right"> <a accesskey="n" href="inmem_txnexample_c.html">Next</a></td> 24 </tr> 25 </table> 26 <hr /> 27 </div> 28 <div class="sect1" lang="en" xml:lang="en"> 29 <div class="titlepage"> 30 <div> 31 <div> 32 <h2 class="title" style="clear: both"><a id="txnexample_c"></a>Transaction Example</h2> 33 </div> 34 </div> 35 </div> 36 <p> 37 The following code provides a fully functional example of a 38 multi-threaded transactional DB application. For improved 39 portability across platforms, this examples uses pthreads to 40 provide threading support. 41 </p> 42 <p> 43 The example opens an environment and database and then creates 5 44 threads, each of which writes 500 records to the database. The keys 45 used for these writes are pre-determined strings, while the data is 46 a random value. This means that the actual data is arbitrary and 47 therefore uninteresting; we picked it only because it requires 48 minimum code to implement and therefore will stay out of the way of 49 the main points of this example. 50 </p> 51 <p> 52 Each thread writes 10 records under a single transaction 53 before committing and writing another 10 (this is repeated 50 54 times). At the end of each transaction, but before committing, each 55 thread calls a function that uses a cursor to read every record in 56 the database. We do this in order to make some points about 57 database reads in a transactional environment. 58 </p> 59 <p> 60 Of course, each writer thread performs deadlock detection as 61 described in this manual. In addition, normal recovery is performed 62 when the environment is opened. 63 </p> 64 <p> 65 We start with our normal <code class="literal">include</code> directives: 66 </p> 67 <pre class="programlisting">// File TxnGuide.cpp 68 69// We assume an ANSI-compatible compiler 70#include <db_cxx.h> 71#include <pthread.h> 72#include <iostream> 73 74#ifdef _WIN32 75extern int getopt(int, char * const *, const char *); 76#else 77#include <unistd.h> 78#endif </pre> 79 <p> 80 We also need a directive that we use to identify how many threads we 81 want our program to create: 82</p> 83 <pre class="programlisting">// Run 5 writers threads at a time. 84#define NUMWRITERS 5 </pre> 85 <p> 86 Next we declare a couple of global 87 variables (used by our threads), and we provide our forward 88 declarations for the functions used by this example. 89</p> 90 <pre class="programlisting">// Printing of pthread_t is implementation-specific, so we 91// create our own thread IDs for reporting purposes. 92int global_thread_num; 93pthread_mutex_t thread_num_lock; 94 95// Forward declarations 96int countRecords(Db *, DbTxn *); 97int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t); 98int usage(void); 99void *writerThread(void *); </pre> 100 <p> 101 We now implement our usage function, which identifies our only command line 102 parameter: 103</p> 104 <pre class="programlisting">// Usage function 105int 106usage() 107{ 108 std::cerr << " [-h <database_home_directory>]" << std::endl; 109 return (EXIT_FAILURE); 110} </pre> 111 <p> 112 With that, we have finished up our program's housekeeping, and we can 113 now move on to the main part of our program. As usual, we begin with 114 <code class="function">main()</code>. First we declare all our variables, and 115 then we initialize our DB handles. 116</p> 117 <pre class="programlisting">int 118main(int argc, char *argv[]) 119{ 120 // Initialize our handles 121 Db *dbp = NULL; 122 DbEnv *envp = NULL; 123 124 pthread_t writerThreads[NUMWRITERS]; 125 int ch, i; 126 u_int32_t envFlags; 127 char *dbHomeDir; 128 129 // Application name 130 const char *progName = "TxnGuide"; 131 132 // Database file name 133 const char *fileName = "mydb.db"; </pre> 134 <p> 135 Now we need to parse our command line. In this case, all we want is to 136 know where our environment directory is. If the <code class="literal">-h</code> 137 option is not provided when this example is run, the current working 138 directory is used instead. 139</p> 140 <pre class="programlisting"> // Parse the command line arguments 141#ifdef _WIN32 142 dbHomeDir = ".\\"; 143#else 144 dbHomeDir = "./"; 145#endif 146 while ((ch = getopt(argc, argv, "h:")) != EOF) 147 switch (ch) { 148 case 'h': 149 dbHomeDir = optarg; 150 break; 151 case '?': 152 default: 153 return (usage()); 154 } </pre> 155 <p> 156 Next we create our database handle, and we define our environment open flags. 157 There are a few things to notice here: 158</p> 159 <div class="itemizedlist"> 160 <ul type="disc"> 161 <li> 162 <p> 163 We specify <code class="literal">DB_RECOVER</code>, which means that normal 164 recovery is run every time we start the application. This is 165 highly desirable and recommended for most 166 applications. 167 </p> 168 </li> 169 <li> 170 <p> 171 We also specify <code class="literal">DB_THREAD</code>, which means our 172 environment handle will be free-threaded. This is very 173 important because we will be sharing the environment handle 174 across threads. 175 </p> 176 </li> 177 </ul> 178 </div> 179 <pre class="programlisting"> // Env open flags 180 envFlags = 181 DB_CREATE | // Create the environment if it does not exist 182 DB_RECOVER | // Run normal recovery. 183 DB_INIT_LOCK | // Initialize the locking subsystem 184 DB_INIT_LOG | // Initialize the logging subsystem 185 DB_INIT_TXN | // Initialize the transactional subsystem. This 186 // also turns on logging. 187 DB_INIT_MPOOL | // Initialize the memory pool (in-memory cache) 188 DB_THREAD; // Cause the environment to be free-threaded 189 190 try { 191 // Create and open the environment 192 envp = new DbEnv(0); </pre> 193 <p> 194 Now we configure how we want deadlock detection performed. In our case, we will cause DB to perform deadlock 195 detection by walking its internal lock tables looking for a block every time a lock is requested. Further, in the 196 event of a deadlock, the thread that holds the youngest lock will receive the deadlock notification. 197 </p> 198 <pre class="programlisting"> // Indicate that we want db to internally perform deadlock 199 // detection. Also indicate that the transaction with 200 // the fewest number of write locks will receive the 201 // deadlock notification in the event of a deadlock. 202 envp->set_lk_detect(DB_LOCK_MINWRITE); </pre> 203 <p> 204 Now we open our environment. 205</p> 206 <pre class="programlisting"> // If we had utility threads (for running checkpoints or 207 // deadlock detection, for example) we would spawn those 208 // here. However, for a simple example such as this, 209 // that is not required. 210 211 envp->open(dbHomeDir, envFlags, 0); </pre> 212 <p> 213 Now we call the function that will open our database for us. This is 214 not very interesting, except that you will notice that we are 215 specifying <code class="literal">DB_DUPSORT</code>. This is required purely by 216 the data that we are writing to the database, and it is only necessary 217 if you run the application more than once without first deleting the environment. 218</p> 219 <p> 220The implementation of <code class="function">open_db()</code> is described 221 later in this section. 222</p> 223 <pre class="programlisting"> // Open the database 224 openDb(&dbp, progName, fileName, envp, DB_DUPSORT); </pre> 225 <p> 226 Now we create our threads. In this example we are using pthreads 227 for our threading package. A description of threading (beyond how 228 it impacts DB usage) is beyond the scope of this manual. 229 However, the things that we are doing here should be familiar to 230 anyone who has prior experience with any threading package. We are 231 simply initializing a mutex, creating our threads, and then joining 232 our threads, which causes our program to wait until the joined 233 threads have completed before continuing operations in the main 234 thread. 235 </p> 236 <pre class="programlisting"> // Initialize a pthread mutex. Used to help provide thread ids. 237 (void)pthread_mutex_init(&thread_num_lock, NULL); 238 239 // Start the writer threads. 240 for (i = 0; i < NUMWRITERS; i++) 241 (void)pthread_create(&writerThreads[i], NULL, 242 writerThread, (void *)dbp); 243 244 // Join the writers 245 for (i = 0; i < NUMWRITERS; i++) 246 (void)pthread_join(writerThreads[i], NULL); 247 248 } catch(DbException &e) { 249 std::cerr << "Error opening database environment: " 250 << dbHomeDir << std::endl; 251 std::cerr << e.what() << std::endl; 252 return (EXIT_FAILURE); 253 } </pre> 254 <p> 255 Finally, to wrap up <code class="function">main()</code>, we close out our 256 database and environment handle, as is normal for any DB 257 application. Notice that this is where our <code class="literal">err</code> 258 label is placed in our application. If any database operation prior 259 to this point in the program returns an error status, the program 260 simply jumps to this point and closes our handles if necessary 261 before exiting the application completely. 262 </p> 263 <pre class="programlisting"> try { 264 // Close our database handle if it was opened. 265 if (dbp != NULL) 266 dbp->close(0); 267 268 // Close our environment if it was opened. 269 if (envp != NULL) 270 envp->close(0); 271 } catch(DbException &e) { 272 std::cerr << "Error closing database and environment." 273 << std::endl; 274 std::cerr << e.what() << std::endl; 275 return (EXIT_FAILURE); 276 } 277 278 // Final status message and return. 279 280 std::cout << "I'm all done." << std::endl; 281 return (EXIT_SUCCESS); 282} </pre> 283 <p> 284 Now that we have completed <code class="function">main()</code>, we need to 285 implement the function that our writer threads will actually run. This 286 is where the bulk of our transactional code resides. 287</p> 288 <p> 289 We start as usual with variable declarations and initialization. 290</p> 291 <pre class="programlisting">// A function that performs a series of writes to a 292// Berkeley DB database. The information written 293// to the database is largely nonsensical, but the 294// mechanisms of transactional commit/abort and 295// deadlock detection are illustrated here. 296void * 297writerThread(void *args) 298{ 299 int j, thread_num; 300 int max_retries = 20; // Max retry on a deadlock 301 char *key_strings[] = {"key 1", "key 2", "key 3", "key 4", 302 "key 5", "key 6", "key 7", "key 8", 303 "key 9", "key 10"}; 304 305 Db *dbp = (Db *)args; 306 DbEnv *envp = dbp->get_env(); </pre> 307 <p> 308 Now we want a thread number for reporting purposes. It is possible to 309 use the <code class="literal">pthread_t</code> value directly for this purpose, 310 but how that is done unfortunately differs depending 311 on the pthread implementation you are using. So instead we use a 312 mutex-protected global variable to obtain a simple integer for 313 our reporting purposes. 314</p> 315 <p> 316 Note that we are also use this thread id for initializing a random number generator, which we do here. 317 We use this random number generator for data generation. 318</p> 319 <pre class="programlisting"> // Get the thread number 320 (void)pthread_mutex_lock(&thread_num_lock); 321 global_thread_num++; 322 thread_num = global_thread_num; 323 (void)pthread_mutex_unlock(&thread_num_lock); 324 325 // Initialize the random number generator 326 srand((u_int)pthread_self()); </pre> 327 <p> 328 Now we begin the loop that we use to write data to the database. 329 330 <span> 331 Notice that in this top loop, we begin a new transaction. 332 </span> 333 334 We will actually use 50 transactions per writer 335 thread, although we will only ever have one active transaction per 336 thread at a time. Within each transaction, we will perform 10 337 database writes. 338 </p> 339 <p> 340 By combining multiple writes together under a single transaction, 341 we increase the likelihood that a deadlock will occur. Normally, 342 you want to reduce the potential for a deadlock and in this case 343 the way to do that is to perform a single write per transaction. 344 To avoid deadlocks, we could be using auto commit to 345 write to our database for this workload. 346 </p> 347 <p> 348 However, we want to show deadlock handling and by performing 349 multiple writes per transaction we can actually observe deadlocks 350 occurring. We also want to underscore the idea that you can 351 combing multiple database operations together in a single atomic 352 unit of work in order to improve the efficiency of your writes. 353 </p> 354 <pre class="programlisting"> // Perform 50 transactions 355 for (int i=0; i<50; i++) { 356 DbTxn *txn; 357 bool retry = true; 358 int retry_count = 0; 359 // while loop is used for deadlock retries 360 while (retry) { 361 // try block used for deadlock detection and 362 // general db exception handling 363 try { 364 365 // Begin our transaction. We group multiple writes in 366 // this thread under a single transaction so as to 367 // (1) show that you can atomically perform multiple 368 // writes at a time, and (2) to increase the chances 369 // of a deadlock occurring so that we can observe our 370 // deadlock detection at work. 371 372 // Normally we would want to avoid the potential for 373 // deadlocks, so for this workload the correct thing 374 // would be to perform our puts with auto commit. But 375 // that would excessively simplify our example, so we 376 // do the "wrong" thing here instead. 377 txn = NULL; 378 envp->txn_begin(NULL, &txn, 0); </pre> 379 <p> 380 Now we begin the inner loop that we use to actually 381 perform the write. 382 </p> 383 <pre class="programlisting"> // Perform the database write for this transaction. 384 for (j = 0; j < 10; j++) { 385 Dbt key, value; 386 key.set_data(key_strings[j]); 387 key.set_size((strlen(key_strings[j]) + 1) * 388 sizeof(char)); 389 390 int payload = rand() + i; 391 value.set_data(&payload); 392 value.set_size(sizeof(int)); 393 394 // Perform the database put 395 dbp->put(txn, &key, &value, 0); 396 } </pre> 397 <p> 398 Having completed the inner database write loop, we could simply 399 commit the transaction and continue on to the next block of 10 400 writes. However, we want to first illustrate a few points about 401 transactional processing so instead we call our 402 403 <code class="function">countRecords()</code> 404 405 function before calling the transaction 406 commit. 407 408 <code class="function">countRecords()</code> 409 uses a cursor to read every 410 record in the database and return a count of the number of records 411 that it found. 412 </p> 413 <pre class="programlisting"> // countRecords runs a cursor over the entire database. 414 // We do this to illustrate issues of deadlocking 415 std::cout << thread_num << " : Found " 416 << countRecords(dbp, NULL) 417 << " records in the database." << std::endl; 418 419 std::cout << thread_num << " : committing txn : " << i 420 << std::endl; 421 422 // commit 423 try { 424 txn->commit(0); 425 retry = false; 426 txn = NULL; 427 } catch (DbException &e) { 428 std::cout << "Error on txn commit: " 429 << e.what() << std::endl; 430 } </pre> 431 <p> 432 Finally, we finish our try block. Notice how we examine the 433 exceptions to determine whether we need to 434 abort (or abort/retry in the case of a deadlock) our current 435 transaction. 436 </p> 437 <pre class="programlisting"> } catch (DbDeadlockException &de) { 438 // First thing we MUST do is abort the transaction. 439 if (txn != NULL) 440 (void)txn->abort(); 441 442 // Now we decide if we want to retry the operation. 443 // If we have retried less than max_retries, 444 // increment the retry count and goto retry. 445 if (retry_count < max_retries) { 446 std::cout << "############### Writer " << thread_num 447 << ": Got DB_LOCK_DEADLOCK.\n" 448 << "Retrying write operation." 449 << std::endl; 450 retry_count++; 451 retry = true; 452 } else { 453 // Otherwise, just give up. 454 std::cerr << "Writer " << thread_num 455 << ": Got DeadLockException and out of " 456 << "retries. Giving up." << std::endl; 457 retry = false; 458 } 459 } catch (DbException &e) { 460 std::cerr << "db put failed" << std::endl; 461 std::cerr << e.what() << std::endl; 462 if (txn != NULL) 463 txn->abort(); 464 retry = false; 465 } catch (std::exception &ee) { 466 std::cerr << "Unknown exception: " << ee.what() << std::endl; 467 return (0); 468 } 469 } 470 } 471 return (0); 472} </pre> 473 <p> 474 <span> 475 We want to back up for a moment and take a look at the call to <code class="function">countRecords()</code>. 476 </span> 477 If you look at the 478 479 <code class="function">countRecords()</code> 480 function prototype at the beginning of this example, you will see that the 481 function's second parameter takes a transaction handle. However, 482 our usage of the function here does not pass a transaction handle 483 through to the function. 484 </p> 485 <p> 486 487 Because 488 489 <code class="function">countRecords()</code> 490 reads every record in the database, if used incorrectly the thread 491 will self-deadlock. The writer thread has just written 500 records 492 to the database, but because the transaction used for that write 493 has not yet been committed, each of those 500 records are still 494 locked by the thread's transaction. If we then simply run a 495 non-transactional cursor over the database from within the same 496 thread that has locked those 500 records, the cursor will 497 block when it tries to read one of those transactional 498 protected records. The thread immediately stops operation at that 499 point while the cursor waits for the read lock it has 500 requested. Because that read lock will never be released (the thread 501 can never make any forward progress), this represents a 502 self-deadlock for the the thread. 503 </p> 504 <p> 505 There are three ways to prevent this self-deadlock: 506 </p> 507 <div class="orderedlist"> 508 <ol type="1"> 509 <li> 510 <p> 511 We can move the call to 512 513 <code class="function">countRecords()</code> 514 to a point after the thread's transaction has committed. 515 </p> 516 </li> 517 <li> 518 <p> 519 We can allow 520 521 <code class="function">countRecords()</code> 522 to operate under the same transaction as all of the writes 523 were performed (this is what the transaction parameter for 524 the function is for). 525 </p> 526 </li> 527 <li> 528 <p> 529 We can reduce our isolation guarantee for the application 530 by allowing uncommitted reads. 531 </p> 532 </li> 533 </ol> 534 </div> 535 <p> 536 For this example, we choose to use option 3 (uncommitted reads) to avoid 537 the deadlock. This means that we have to open our database such 538 that it supports uncommitted reads, and we have to open our cursor handle 539 so that it knows to perform uncommitted reads. 540 </p> 541 <p> 542 Note that in <a class="xref" href="inmem_txnexample_c.html" title="In-Memory Transaction Example">In-Memory Transaction Example</a>, 543 we simply perform the cursor operation using the same transaction 544 as is used for the thread's writes. 545 </p> 546 <p> 547 The following is the 548 549 <code class="function">countRecords()</code> 550 implementation. There is not anything particularly interesting 551 about this function other than specifying uncommitted reads when 552 we open the cursor handle, but we include the function here anyway 553 for the sake of completeness. 554 </p> 555 <pre class="programlisting">// This simply counts the number of records contained in the 556// database and returns the result. 557// 558// Note that this method exists only for illustrative purposes. 559// A more straight-forward way to count the number of records in 560// a database is to use the Database.getStats() method. 561int 562countRecords(Db *dbp, DbTxn *txn) 563{ 564 565 Dbc *cursorp = NULL; 566 int count = 0; 567 568 try { 569 // Get the cursor 570 dbp->cursor(txn, &cursorp, DB_READ_UNCOMMITTED); 571 572 Dbt key, value; 573 while (cursorp->get(&key, &value, DB_NEXT) == 0) { 574 count++; 575 } 576 } catch (DbDeadlockException &de) { 577 std::cerr << "countRecords: got deadlock" << std::endl; 578 cursorp->close(); 579 throw de; 580 } catch (DbException &e) { 581 std::cerr << "countRecords error:" << std::endl; 582 std::cerr << e.what() << std::endl; 583 } 584 585 if (cursorp != NULL) { 586 try { 587 cursorp->close(); 588 } catch (DbException &e) { 589 std::cerr << "countRecords: cursor close failed:" << std::endl; 590 std::cerr << e.what() << std::endl; 591 } 592 } 593 594 return (count); 595} </pre> 596 <p> 597 Finally, we provide the implementation of our 598 599 <code class="function">openDb()</code> 600 function. This function should hold 601 no surprises for you. Note, however, that we do specify uncommitted reads 602 when we open the database. If we did not do this, then our 603 604 <code class="function">countRecords()</code> 605 function would cause our 606 thread to self-deadlock because the cursor could not be opened to 607 support uncommitted reads (that flag on the cursor open would, in fact, 608 be silently ignored by DB). 609 </p> 610 <pre class="programlisting">// Open a Berkeley DB database 611int 612openDb(Db **dbpp, const char *progname, const char *fileName, 613 DbEnv *envp, u_int32_t extraFlags) 614{ 615 int ret; 616 u_int32_t openFlags; 617 618 try { 619 Db *dbp = new Db(envp, 0); 620 621 // Point to the new'd Db 622 *dbpp = dbp; 623 624 if (extraFlags != 0) 625 ret = dbp->set_flags(extraFlags); 626 627 // Now open the database 628 openFlags = DB_CREATE | // Allow database creation 629 DB_READ_UNCOMMITTED | // Allow uncommitted reads 630 DB_AUTO_COMMIT; // Allow auto commit 631 632 dbp->open(NULL, // Txn pointer 633 fileName, // File name 634 NULL, // Logical db name 635 DB_BTREE, // Database type (using btree) 636 openFlags, // Open flags 637 0); // File mode. Using defaults 638 } catch (DbException &e) { 639 std::cerr << progname << "open_db: db open failed:" << std::endl; 640 std::cerr << e.what() << std::endl; 641 return (EXIT_FAILURE); 642 } 643 644 return (EXIT_SUCCESS); 645} </pre> 646 <p> 647 This completes our transactional example. If you would like to 648 experiment with this code, you can find the example in the following 649 location in your DB distribution: 650</p> 651 <pre class="programlisting"><span class="emphasis"><em>DB_INSTALL</em></span>/examples_cxx/txn_guide</pre> 652 </div> 653 <div class="navfooter"> 654 <hr /> 655 <table width="100%" summary="Navigation footer"> 656 <tr> 657 <td width="40%" align="left"><a accesskey="p" href="wrapup.html">Prev</a> </td> 658 <td width="20%" align="center"> 659 <a accesskey="u" href="wrapup.html">Up</a> 660 </td> 661 <td width="40%" align="right"> <a accesskey="n" href="inmem_txnexample_c.html">Next</a></td> 662 </tr> 663 <tr> 664 <td width="40%" align="left" valign="top">Chapter 6. Summary and Examples </td> 665 <td width="20%" align="center"> 666 <a accesskey="h" href="index.html">Home</a> 667 </td> 668 <td width="40%" align="right" valign="top"> In-Memory Transaction Example</td> 669 </tr> 670 </table> 671 </div> 672 </body> 673</html> 674