1// File TxnGuide.cpp 2 3#include <iostream> 4#include <db_cxx.h> 5 6#ifdef _WIN32 7#include <windows.h> 8extern "C" { 9 extern int getopt(int, char * const *, const char *); 10 extern char *optarg; 11} 12#define PATHD '\\' 13 14typedef HANDLE thread_t; 15#define thread_create(thrp, attr, func, arg) \ 16 (((*(thrp) = CreateThread(NULL, 0, \ 17 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) 18#define thread_join(thr, statusp) \ 19 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ 20 GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1) 21 22typedef HANDLE mutex_t; 23#define mutex_init(m, attr) \ 24 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1) 25#define mutex_lock(m) \ 26 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1) 27#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1) 28#else 29#include <pthread.h> 30#include <unistd.h> 31#define PATHD '/' 32 33typedef pthread_t thread_t; 34#define thread_create(thrp, attr, func, arg) \ 35 pthread_create((thrp), (attr), (func), (arg)) 36#define thread_join(thr, statusp) pthread_join((thr), (statusp)) 37 38typedef pthread_mutex_t mutex_t; 39#define mutex_init(m, attr) pthread_mutex_init((m), (attr)) 40#define mutex_lock(m) pthread_mutex_lock(m) 41#define mutex_unlock(m) pthread_mutex_unlock(m) 42#endif 43 44// Run 5 writers threads at a time. 45#define NUMWRITERS 5 46 47// Printing of thread_t is implementation-specific, so we 48// create our own thread IDs for reporting purposes. 49int global_thread_num; 50mutex_t thread_num_lock; 51 52// Forward declarations 53int countRecords(Db *, DbTxn *); 54int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t); 55int usage(void); 56void *writerThread(void *); 57 58// Usage function 59int 60usage() 61{ 62 std::cerr << " [-h <database_home_directory>]" << std::endl; 63 return (EXIT_FAILURE); 64} 65 66int 67main(int argc, char *argv[]) 68{ 69 // Initialize our handles 70 Db *dbp = NULL; 71 DbEnv *envp = NULL; 72 73 thread_t writerThreads[NUMWRITERS]; 74 int ch, i; 75 u_int32_t envFlags; 76 char *dbHomeDir; 77 extern char *optarg; 78 79 // Application name 80 const char *progName = "TxnGuide"; 81 82 // Database file name 83 const char *fileName = "mydb.db"; 84 85 // Parse the command line arguments 86#ifdef _WIN32 87 dbHomeDir = ".\\"; 88#else 89 dbHomeDir = "./"; 90#endif 91 while ((ch = getopt(argc, argv, "h:")) != EOF) 92 switch (ch) { 93 case 'h': 94 dbHomeDir = optarg; 95 break; 96 case '?': 97 default: 98 return (usage()); 99 } 100 101 102 // Env open flags 103 envFlags = 104 DB_CREATE | // Create the environment if it does not exist 105 DB_RECOVER | // Run normal recovery. 106 DB_INIT_LOCK | // Initialize the locking subsystem 107 DB_INIT_LOG | // Initialize the logging subsystem 108 DB_INIT_TXN | // Initialize the transactional subsystem. This 109 // also turns on logging. 110 DB_INIT_MPOOL | // Initialize the memory pool (in-memory cache) 111 DB_THREAD; // Cause the environment to be free-threaded 112 113 try { 114 // Create and open the environment 115 envp = new DbEnv(0); 116 117 // Indicate that we want db to internally perform deadlock 118 // detection. Also indicate that the transaction with 119 // the fewest number of write locks will receive the 120 // deadlock notification in the event of a deadlock. 121 envp->set_lk_detect(DB_LOCK_MINWRITE); 122 123 envp->open(dbHomeDir, envFlags, 0); 124 125 126 // If we had utility threads (for running checkpoints or 127 // deadlock detection, for example) we would spawn those 128 // here. However, for a simple example such as this, 129 // that is not required. 130 131 // Open the database 132 openDb(&dbp, progName, fileName, 133 envp, DB_DUPSORT); 134 135 // Initialize a mutex. Used to help provide thread ids. 136 (void)mutex_init(&thread_num_lock, NULL); 137 138 // Start the writer threads. 139 for (i = 0; i < NUMWRITERS; i++) 140 (void)thread_create( 141 &writerThreads[i], NULL, 142 writerThread, (void *)dbp); 143 144 // Join the writers 145 for (i = 0; i < NUMWRITERS; i++) 146 (void)thread_join(writerThreads[i], NULL); 147 148 } catch(DbException &e) { 149 std::cerr << "Error opening database environment: " 150 << dbHomeDir << std::endl; 151 std::cerr << e.what() << std::endl; 152 return (EXIT_FAILURE); 153 } 154 155 try { 156 // Close our database handle if it was opened. 157 if (dbp != NULL) 158 dbp->close(0); 159 160 // Close our environment if it was opened. 161 if (envp != NULL) 162 envp->close(0); 163 } catch(DbException &e) { 164 std::cerr << "Error closing database and environment." 165 << std::endl; 166 std::cerr << e.what() << std::endl; 167 return (EXIT_FAILURE); 168 } 169 170 // Final status message and return. 171 172 std::cout << "I'm all done." << std::endl; 173 return (EXIT_SUCCESS); 174} 175 176// A function that performs a series of writes to a 177// Berkeley DB database. The information written 178// to the database is largely nonsensical, but the 179// mechanism of transactional commit/abort and 180// deadlock detection is illustrated here. 181void * 182writerThread(void *args) 183{ 184 int j, thread_num; 185 int max_retries = 20; // Max retry on a deadlock 186 char *key_strings[] = {"key 1", "key 2", "key 3", "key 4", 187 "key 5", "key 6", "key 7", "key 8", 188 "key 9", "key 10"}; 189 190 Db *dbp = (Db *)args; 191 DbEnv *envp = dbp->get_env(); 192 193 // Get the thread number 194 (void)mutex_lock(&thread_num_lock); 195 global_thread_num++; 196 thread_num = global_thread_num; 197 (void)mutex_unlock(&thread_num_lock); 198 199 // Initialize the random number generator 200 srand(thread_num); 201 202 // Perform 50 transactions 203 for (int i=0; i<50; i++) { 204 DbTxn *txn; 205 bool retry = true; 206 int retry_count = 0; 207 // while loop is used for deadlock retries 208 while (retry) { 209 // try block used for deadlock detection and 210 // general db exception handling 211 try { 212 213 // Begin our transaction. We group multiple writes in 214 // this thread under a single transaction so as to 215 // (1) show that you can atomically perform multiple 216 // writes at a time, and (2) to increase the chances 217 // of a deadlock occurring so that we can observe our 218 // deadlock detection at work. 219 220 // Normally we would want to avoid the potential for 221 // deadlocks, so for this workload the correct thing 222 // would be to perform our puts with autocommit. But 223 // that would excessively simplify our example, so we 224 // do the "wrong" thing here instead. 225 txn = NULL; 226 envp->txn_begin(NULL, &txn, 0); 227 228 // Perform the database write for this transaction. 229 for (j = 0; j < 10; j++) { 230 Dbt key, value; 231 key.set_data(key_strings[j]); 232 key.set_size((u_int32_t)strlen(key_strings[j]) + 1); 233 234 int payload = rand() + i; 235 value.set_data(&payload); 236 value.set_size(sizeof(int)); 237 238 // Perform the database put 239 dbp->put(txn, &key, &value, 0); 240 } 241 242 // countRecords runs a cursor over the entire database. 243 // We do this to illustrate issues of deadlocking 244 std::cout << thread_num << " : Found " 245 << countRecords(dbp, NULL) 246 << " records in the database." << std::endl; 247 248 std::cout << thread_num << " : committing txn : " << i 249 << std::endl; 250 251 // commit 252 try { 253 txn->commit(0); 254 retry = false; 255 txn = NULL; 256 } catch (DbException &e) { 257 std::cout << "Error on txn commit: " 258 << e.what() << std::endl; 259 } 260 } catch (DbDeadlockException &) { 261 // First thing that we MUST do is abort the transaction. 262 if (txn != NULL) 263 (void)txn->abort(); 264 265 // Now we decide if we want to retry the operation. 266 // If we have retried less than max_retries, 267 // increment the retry count and goto retry. 268 if (retry_count < max_retries) { 269 std::cout << "############### Writer " << thread_num 270 << ": Got DB_LOCK_DEADLOCK.\n" 271 << "Retrying write operation." 272 << std::endl; 273 retry_count++; 274 retry = true; 275 } else { 276 // Otherwise, just give up. 277 std::cerr << "Writer " << thread_num 278 << ": Got DeadLockException and out of " 279 << "retries. Giving up." << std::endl; 280 retry = false; 281 } 282 } catch (DbException &e) { 283 std::cerr << "db put failed" << std::endl; 284 std::cerr << e.what() << std::endl; 285 if (txn != NULL) 286 txn->abort(); 287 retry = false; 288 } catch (std::exception &ee) { 289 std::cerr << "Unknown exception: " << ee.what() << std::endl; 290 return (0); 291 } 292 } 293 } 294 return (0); 295} 296 297 298// This simply counts the number of records contained in the 299// database and returns the result. You can use this method 300// in three ways: 301// 302// First call it with an active txn handle. 303// Secondly, configure the cursor for uncommitted reads 304// 305// Third, call countRecords AFTER the writer has committed 306// its transaction. 307// 308// If you do none of these things, the writer thread will 309// self-deadlock. 310// 311// Note that this method exists only for illustrative purposes. 312// A more straight-forward way to count the number of records in 313// a database is to use the Database.getStats() method. 314int 315countRecords(Db *dbp, DbTxn *txn) 316{ 317 318 Dbc *cursorp = NULL; 319 int count = 0; 320 321 try { 322 // Get the cursor 323 dbp->cursor(txn, &cursorp, DB_READ_UNCOMMITTED); 324 325 Dbt key, value; 326 while (cursorp->get(&key, &value, DB_NEXT) == 0) { 327 count++; 328 } 329 } catch (DbDeadlockException &de) { 330 std::cerr << "countRecords: got deadlock" << std::endl; 331 cursorp->close(); 332 throw de; 333 } catch (DbException &e) { 334 std::cerr << "countRecords error:" << std::endl; 335 std::cerr << e.what() << std::endl; 336 } 337 338 if (cursorp != NULL) { 339 try { 340 cursorp->close(); 341 } catch (DbException &e) { 342 std::cerr << "countRecords: cursor close failed:" << std::endl; 343 std::cerr << e.what() << std::endl; 344 } 345 } 346 347 return (count); 348} 349 350 351// Open a Berkeley DB database 352int 353openDb(Db **dbpp, const char *progname, const char *fileName, 354 DbEnv *envp, u_int32_t extraFlags) 355{ 356 int ret; 357 u_int32_t openFlags; 358 359 try { 360 Db *dbp = new Db(envp, 0); 361 362 // Point to the new'd Db 363 *dbpp = dbp; 364 365 if (extraFlags != 0) 366 ret = dbp->set_flags(extraFlags); 367 368 // Now open the database */ 369 openFlags = DB_CREATE | // Allow database creation 370 DB_READ_UNCOMMITTED | // Allow uncommitted reads 371 DB_AUTO_COMMIT; // Allow autocommit 372 373 dbp->open(NULL, // Txn pointer 374 fileName, // File name 375 NULL, // Logical db name 376 DB_BTREE, // Database type (using btree) 377 openFlags, // Open flags 378 0); // File mode. Using defaults 379 } catch (DbException &e) { 380 std::cerr << progname << ": openDb: db open failed:" << std::endl; 381 std::cerr << e.what() << std::endl; 382 return (EXIT_FAILURE); 383 } 384 385 return (EXIT_SUCCESS); 386} 387 388