1// File TxnGuideInMemory.cpp 2 3#include <iostream> 4#include <db_cxx.h> 5 6#ifdef _WIN32 7#include <windows.h> 8#define PATHD '\\' 9extern "C" { 10 extern int getopt(int, char * const *, const char *); 11 extern char *optarg; 12} 13 14typedef HANDLE thread_t; 15#define thread_create(thrp, attr, func, arg) \ 16 (((*(thrp) = CreateThread(NULL, 0, \ 17 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) 18#define thread_join(thr, statusp) \ 19 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ 20 GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1) 21 22typedef HANDLE mutex_t; 23#define mutex_init(m, attr) \ 24 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1) 25#define mutex_lock(m) \ 26 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1) 27#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1) 28#else 29#include <pthread.h> 30#include <unistd.h> 31#define PATHD '/' 32 33typedef pthread_t thread_t; 34#define thread_create(thrp, attr, func, arg) \ 35 pthread_create((thrp), (attr), (func), (arg)) 36#define thread_join(thr, statusp) pthread_join((thr), (statusp)) 37 38typedef pthread_mutex_t mutex_t; 39#define mutex_init(m, attr) pthread_mutex_init((m), (attr)) 40#define mutex_lock(m) pthread_mutex_lock(m) 41#define mutex_unlock(m) pthread_mutex_unlock(m) 42#endif 43 44// Run 5 writers threads at a time. 45#define NUMWRITERS 5 46 47// Printing of pthread_t is implementation-specific, so we 48// create our own thread IDs for reporting purposes. 49int global_thread_num; 50mutex_t thread_num_lock; 51 52// Forward declarations 53int countRecords(Db *, DbTxn *); 54int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t); 55int usage(void); 56void *writerThread(void *); 57 58int 59main(void) 60{ 61 // Initialize our handles 62 Db *dbp = NULL; 63 DbEnv *envp = NULL; 64 65 thread_t writerThreads[NUMWRITERS]; 66 int i; 67 u_int32_t envFlags; 68 69 // Application name 70 const char *progName = "TxnGuideInMemory"; 71 72 // Env open flags 73 envFlags = 74 DB_CREATE | // Create the environment if it does not exist 75 DB_RECOVER | // Run normal recovery. 76 DB_INIT_LOCK | // Initialize the locking subsystem 77 DB_INIT_LOG | // Initialize the logging subsystem 78 DB_INIT_TXN | // Initialize the transactional subsystem. This 79 // also turns on logging. 80 DB_INIT_MPOOL | // Initialize the memory pool (in-memory cache) 81 DB_PRIVATE | // Region files are not backed by the filesystem. 82 // Instead, they are backed by heap memory. 83 DB_THREAD; // Cause the environment to be free-threaded 84 85 try { 86 // Create the environment 87 envp = new DbEnv(0); 88 89 // Specify in-memory logging 90 envp->log_set_config(DB_LOG_IN_MEMORY, 1); 91 92 // Specify the size of the in-memory log buffer. 93 envp->set_lg_bsize(10 * 1024 * 1024); 94 95 // Specify the size of the in-memory cache 96 envp->set_cachesize(0, 10 * 1024 * 1024, 1); 97 98 // Indicate that we want db to internally perform deadlock 99 // detection. Also indicate that the transaction with 100 // the fewest number of write locks will receive the 101 // deadlock notification in the event of a deadlock. 102 envp->set_lk_detect(DB_LOCK_MINWRITE); 103 104 // Open the environment 105 envp->open(NULL, envFlags, 0); 106 107 // If we had utility threads (for running checkpoints or 108 // deadlock detection, for example) we would spawn those 109 // here. However, for a simple example such as this, 110 // that is not required. 111 112 // Open the database 113 openDb(&dbp, progName, NULL, 114 envp, DB_DUPSORT); 115 116 // Initialize a mutex. Used to help provide thread ids. 117 (void)mutex_init(&thread_num_lock, NULL); 118 119 // Start the writer threads. 120 for (i = 0; i < NUMWRITERS; i++) 121 (void)thread_create( 122 &writerThreads[i], NULL, 123 writerThread, 124 (void *)dbp); 125 126 // Join the writers 127 for (i = 0; i < NUMWRITERS; i++) 128 (void)thread_join(writerThreads[i], NULL); 129 130 } catch(DbException &e) { 131 std::cerr << "Error opening database environment: " 132 << std::endl; 133 std::cerr << e.what() << std::endl; 134 return (EXIT_FAILURE); 135 } 136 137 try { 138 // Close our database handle if it was opened. 139 if (dbp != NULL) 140 dbp->close(0); 141 142 // Close our environment if it was opened. 143 if (envp != NULL) 144 envp->close(0); 145 } catch(DbException &e) { 146 std::cerr << "Error closing database and environment." 147 << std::endl; 148 std::cerr << e.what() << std::endl; 149 return (EXIT_FAILURE); 150 } 151 152 // Final status message and return. 153 154 std::cout << "I'm all done." << std::endl; 155 return (EXIT_SUCCESS); 156} 157 158// A function that performs a series of writes to a 159// Berkeley DB database. The information written 160// to the database is largely nonsensical, but the 161// mechanism of transactional commit/abort and 162// deadlock detection is illustrated here. 163void * 164writerThread(void *args) 165{ 166 int j, thread_num; 167 int max_retries = 20; // Max retry on a deadlock 168 char *key_strings[] = {"key 1", "key 2", "key 3", "key 4", 169 "key 5", "key 6", "key 7", "key 8", 170 "key 9", "key 10"}; 171 172 Db *dbp = (Db *)args; 173 DbEnv *envp = dbp->get_env(); 174 175 // Get the thread number 176 (void)mutex_lock(&thread_num_lock); 177 global_thread_num++; 178 thread_num = global_thread_num; 179 (void)mutex_unlock(&thread_num_lock); 180 181 // Initialize the random number generator 182 srand(thread_num); 183 184 // Perform 50 transactions 185 for (int i=0; i<50; i++) { 186 DbTxn *txn; 187 bool retry = true; 188 int retry_count = 0; 189 // while loop is used for deadlock retries 190 while (retry) { 191 // try block used for deadlock detection and 192 // general db exception handling 193 try { 194 195 // Begin our transaction. We group multiple writes in 196 // this thread under a single transaction so as to 197 // (1) show that you can atomically perform multiple 198 // writes at a time, and (2) to increase the chances 199 // of a deadlock occurring so that we can observe our 200 // deadlock detection at work. 201 202 // Normally we would want to avoid the potential for 203 // deadlocks, so for this workload the correct thing 204 // would be to perform our puts with autocommit. But 205 // that would excessively simplify our example, so we 206 // do the "wrong" thing here instead. 207 txn = NULL; 208 envp->txn_begin(NULL, &txn, 0); 209 210 // Perform the database write for this transaction. 211 for (j = 0; j < 10; j++) { 212 Dbt key, value; 213 key.set_data(key_strings[j]); 214 key.set_size((u_int32_t)strlen(key_strings[j]) + 1); 215 216 int payload = rand() + i; 217 value.set_data(&payload); 218 value.set_size(sizeof(int)); 219 220 // Perform the database put 221 dbp->put(txn, &key, &value, 0); 222 } 223 224 // countRecords runs a cursor over the entire database. 225 // We do this to illustrate issues of deadlocking 226 std::cout << thread_num << " : Found " 227 << countRecords(dbp, txn) 228 << " records in the database." << std::endl; 229 230 std::cout << thread_num << " : committing txn : " << i 231 << std::endl; 232 233 // commit 234 try { 235 txn->commit(0); 236 retry = false; 237 txn = NULL; 238 } catch (DbException &e) { 239 std::cout << "Error on txn commit: " 240 << e.what() << std::endl; 241 } 242 } catch (DbDeadlockException &) { 243 // First thing that we MUST do is abort the transaction. 244 if (txn != NULL) 245 (void)txn->abort(); 246 247 // Now we decide if we want to retry the operation. 248 // If we have retried less than max_retries, 249 // increment the retry count and goto retry. 250 if (retry_count < max_retries) { 251 std::cerr << "############### Writer " << thread_num 252 << ": Got DB_LOCK_DEADLOCK.\n" 253 << "Retrying write operation." << std::endl; 254 retry_count++; 255 retry = true; 256 } else { 257 // Otherwise, just give up. 258 std::cerr << "Writer " << thread_num 259 << ": Got DeadLockException and out of " 260 << "retries. Giving up." << std::endl; 261 retry = false; 262 } 263 } catch (DbException &e) { 264 std::cerr << "db put failed" << std::endl; 265 std::cerr << e.what() << std::endl; 266 if (txn != NULL) 267 txn->abort(); 268 retry = false; 269 } catch (std::exception &ee) { 270 std::cerr << "Unknown exception: " << ee.what() << std::endl; 271 return (0); 272 } 273 } 274 } 275 return (0); 276} 277 278 279// This simply counts the number of records contained in the 280// database and returns the result. You can use this method 281// in three ways: 282// 283// First call it with an active txn handle. 284// 285// Secondly, configure the cursor for uncommitted reads 286// 287// Third, call countRecords AFTER the writer has committed 288// its transaction. 289// 290// If you do none of these things, the writer thread will 291// self-deadlock. 292// 293// Note that this method exists only for illustrative purposes. 294// A more straight-forward way to count the number of records in 295// a database is to use the Database.getStats() method. 296int 297countRecords(Db *dbp, DbTxn *txn) 298{ 299 300 Dbc *cursorp = NULL; 301 int count = 0; 302 303 try { 304 // Get the cursor 305 dbp->cursor(txn, &cursorp, 0); 306 307 Dbt key, value; 308 while (cursorp->get(&key, &value, DB_NEXT) == 0) { 309 count++; 310 } 311 } catch (DbDeadlockException &de) { 312 std::cerr << "countRecords: got deadlock" << std::endl; 313 cursorp->close(); 314 throw de; 315 } catch (DbException &e) { 316 std::cerr << "countRecords error:" << std::endl; 317 std::cerr << e.what() << std::endl; 318 } 319 320 if (cursorp != NULL) { 321 try { 322 cursorp->close(); 323 } catch (DbException &e) { 324 std::cerr << "countRecords: cursor close failed:" << std::endl; 325 std::cerr << e.what() << std::endl; 326 } 327 } 328 329 return (count); 330} 331 332 333// Open a Berkeley DB database 334int 335openDb(Db **dbpp, const char *progname, const char *fileName, 336 DbEnv *envp, u_int32_t extraFlags) 337{ 338 int ret; 339 u_int32_t openFlags; 340 341 try { 342 Db *dbp = new Db(envp, 0); 343 344 // Point to the new'd Db 345 *dbpp = dbp; 346 347 if (extraFlags != 0) 348 ret = dbp->set_flags(extraFlags); 349 350 // Now open the database */ 351 openFlags = DB_CREATE | // Allow database creation 352 DB_THREAD | 353 DB_AUTO_COMMIT; // Allow autocommit 354 355 dbp->open(NULL, // Txn pointer 356 fileName, // File name 357 NULL, // Logical db name 358 DB_BTREE, // Database type (using btree) 359 openFlags, // Open flags 360 0); // File mode. Using defaults 361 } catch (DbException &e) { 362 std::cerr << progname << ": openDb: db open failed:" << std::endl; 363 std::cerr << e.what() << std::endl; 364 return (EXIT_FAILURE); 365 } 366 367 return (EXIT_SUCCESS); 368} 369 370