1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2005-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9// File TxnGuideInMemory.cpp 10 11#include <cstdlib> 12#include <cstring> 13#include <iostream> 14#include <db_cxx.h> 15 16#ifdef _WIN32 17#include <windows.h> 18#define PATHD '\\' 19extern "C" { 20 extern int getopt(int, char * const *, const char *); 21 extern char *optarg; 22} 23 24typedef HANDLE thread_t; 25#define thread_create(thrp, attr, func, arg) \ 26 (((*(thrp) = CreateThread(NULL, 0, \ 27 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) 28#define thread_join(thr, statusp) \ 29 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ 30 ((statusp == NULL) ? 0 : \ 31 (GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1))) 32 33typedef HANDLE mutex_t; 34#define mutex_init(m, attr) \ 35 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1) 36#define mutex_lock(m) \ 37 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1) 38#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1) 39#else 40#include <pthread.h> 41#include <unistd.h> 42#define PATHD '/' 43 44typedef pthread_t thread_t; 45#define thread_create(thrp, attr, func, arg) \ 46 pthread_create((thrp), (attr), (func), (arg)) 47#define thread_join(thr, statusp) pthread_join((thr), (statusp)) 48 49typedef pthread_mutex_t mutex_t; 50#define mutex_init(m, attr) pthread_mutex_init((m), (attr)) 51#define mutex_lock(m) pthread_mutex_lock(m) 52#define mutex_unlock(m) pthread_mutex_unlock(m) 53#endif 54 55// Run 5 writers threads at a time. 56#define NUMWRITERS 5 57 58// Printing of pthread_t is implementation-specific, so we 59// create our own thread IDs for reporting purposes. 60int global_thread_num; 61mutex_t thread_num_lock; 62 63// Forward declarations 64int countRecords(Db *, DbTxn *); 65int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t); 66int usage(void); 67void *writerThread(void *); 68 69int 70main(void) 71{ 72 // Initialize our handles 73 Db *dbp = NULL; 74 DbEnv *envp = NULL; 75 76 thread_t writerThreads[NUMWRITERS]; 77 int i; 78 u_int32_t envFlags; 79 80 // Application name 81 const char *progName = "TxnGuideInMemory"; 82 83 // Env open flags 84 envFlags = 85 DB_CREATE | // Create the environment if it does not exist 86 DB_RECOVER | // Run normal recovery. 87 DB_INIT_LOCK | // Initialize the locking subsystem 88 DB_INIT_LOG | // Initialize the logging subsystem 89 DB_INIT_TXN | // Initialize the transactional subsystem. This 90 // also turns on logging. 91 DB_INIT_MPOOL | // Initialize the memory pool (in-memory cache) 92 DB_PRIVATE | // Region files are not backed by the filesystem. 93 // Instead, they are backed by heap memory. 94 DB_THREAD; // Cause the environment to be free-threaded 95 96 try { 97 // Create the environment 98 envp = new DbEnv(0); 99 100 // Specify in-memory logging 101 envp->log_set_config(DB_LOG_IN_MEMORY, 1); 102 103 // Specify the size of the in-memory log buffer. 104 envp->set_lg_bsize(10 * 1024 * 1024); 105 106 // Specify the size of the in-memory cache 107 envp->set_cachesize(0, 10 * 1024 * 1024, 1); 108 109 // Indicate that we want db to internally perform deadlock 110 // detection. Also indicate that the transaction with 111 // the fewest number of write locks will receive the 112 // deadlock notification in the event of a deadlock. 113 envp->set_lk_detect(DB_LOCK_MINWRITE); 114 115 // Open the environment 116 envp->open(NULL, envFlags, 0); 117 118 // If we had utility threads (for running checkpoints or 119 // deadlock detection, for example) we would spawn those 120 // here. However, for a simple example such as this, 121 // that is not required. 122 123 // Open the database 124 openDb(&dbp, progName, NULL, 125 envp, DB_DUPSORT); 126 127 // Initialize a mutex. Used to help provide thread ids. 128 (void)mutex_init(&thread_num_lock, NULL); 129 130 // Start the writer threads. 131 for (i = 0; i < NUMWRITERS; i++) 132 (void)thread_create( 133 &writerThreads[i], NULL, 134 writerThread, 135 (void *)dbp); 136 137 // Join the writers 138 for (i = 0; i < NUMWRITERS; i++) 139 (void)thread_join(writerThreads[i], NULL); 140 141 } catch(DbException &e) { 142 std::cerr << "Error opening database environment: " 143 << std::endl; 144 std::cerr << e.what() << std::endl; 145 return (EXIT_FAILURE); 146 } 147 148 try { 149 // Close our database handle if it was opened. 150 if (dbp != NULL) 151 dbp->close(0); 152 153 // Close our environment if it was opened. 154 if (envp != NULL) 155 envp->close(0); 156 } catch(DbException &e) { 157 std::cerr << "Error closing database and environment." 158 << std::endl; 159 std::cerr << e.what() << std::endl; 160 return (EXIT_FAILURE); 161 } 162 163 // Final status message and return. 164 165 std::cout << "I'm all done." << std::endl; 166 return (EXIT_SUCCESS); 167} 168 169// A function that performs a series of writes to a 170// Berkeley DB database. The information written 171// to the database is largely nonsensical, but the 172// mechanism of transactional commit/abort and 173// deadlock detection is illustrated here. 174void * 175writerThread(void *args) 176{ 177 int j, thread_num; 178 int max_retries = 20; // Max retry on a deadlock 179 const char *key_strings[] = {"key 1", "key 2", "key 3", "key 4", 180 "key 5", "key 6", "key 7", "key 8", 181 "key 9", "key 10"}; 182 183 Db *dbp = (Db *)args; 184 DbEnv *envp = dbp->get_env(); 185 186 // Get the thread number 187 (void)mutex_lock(&thread_num_lock); 188 global_thread_num++; 189 thread_num = global_thread_num; 190 (void)mutex_unlock(&thread_num_lock); 191 192 // Initialize the random number generator 193 srand(thread_num); 194 195 // Perform 50 transactions 196 for (int i=0; i<50; i++) { 197 DbTxn *txn; 198 bool retry = true; 199 int retry_count = 0; 200 // while loop is used for deadlock retries 201 while (retry) { 202 // try block used for deadlock detection and 203 // general db exception handling 204 try { 205 206 // Begin our transaction. We group multiple writes in 207 // this thread under a single transaction so as to 208 // (1) show that you can atomically perform multiple 209 // writes at a time, and (2) to increase the chances 210 // of a deadlock occurring so that we can observe our 211 // deadlock detection at work. 212 213 // Normally we would want to avoid the potential for 214 // deadlocks, so for this workload the correct thing 215 // would be to perform our puts with autocommit. But 216 // that would excessively simplify our example, so we 217 // do the "wrong" thing here instead. 218 txn = NULL; 219 envp->txn_begin(NULL, &txn, 0); 220 221 // Perform the database write for this transaction. 222 for (j = 0; j < 10; j++) { 223 Dbt key, value; 224 key.set_data((void *)key_strings[j]); 225 key.set_size((u_int32_t)strlen(key_strings[j]) + 1); 226 227 int payload = rand() + i; 228 value.set_data(&payload); 229 value.set_size(sizeof(int)); 230 231 // Perform the database put 232 dbp->put(txn, &key, &value, 0); 233 } 234 235 // countRecords runs a cursor over the entire database. 236 // We do this to illustrate issues of deadlocking 237 std::cout << thread_num << " : Found " 238 << countRecords(dbp, txn) 239 << " records in the database." << std::endl; 240 241 std::cout << thread_num << " : committing txn : " << i 242 << std::endl; 243 244 // commit 245 try { 246 txn->commit(0); 247 retry = false; 248 txn = NULL; 249 } catch (DbException &e) { 250 std::cout << "Error on txn commit: " 251 << e.what() << std::endl; 252 } 253 } catch (DbDeadlockException &) { 254 // First thing that we MUST do is abort the transaction. 255 if (txn != NULL) 256 (void)txn->abort(); 257 258 // Now we decide if we want to retry the operation. 259 // If we have retried less than max_retries, 260 // increment the retry count and goto retry. 261 if (retry_count < max_retries) { 262 std::cerr << "############### Writer " << thread_num 263 << ": Got DB_LOCK_DEADLOCK.\n" 264 << "Retrying write operation." << std::endl; 265 retry_count++; 266 retry = true; 267 } else { 268 // Otherwise, just give up. 269 std::cerr << "Writer " << thread_num 270 << ": Got DeadLockException and out of " 271 << "retries. Giving up." << std::endl; 272 retry = false; 273 } 274 } catch (DbException &e) { 275 std::cerr << "db put failed" << std::endl; 276 std::cerr << e.what() << std::endl; 277 if (txn != NULL) 278 txn->abort(); 279 retry = false; 280 } catch (std::exception &ee) { 281 std::cerr << "Unknown exception: " << ee.what() << std::endl; 282 return (0); 283 } 284 } 285 } 286 return (0); 287} 288 289 290// This simply counts the number of records contained in the 291// database and returns the result. You can use this method 292// in three ways: 293// 294// First call it with an active txn handle. 295// 296// Secondly, configure the cursor for uncommitted reads 297// 298// Third, call countRecords AFTER the writer has committed 299// its transaction. 300// 301// If you do none of these things, the writer thread will 302// self-deadlock. 303// 304// Note that this method exists only for illustrative purposes. 305// A more straight-forward way to count the number of records in 306// a database is to use the Database.getStats() method. 307int 308countRecords(Db *dbp, DbTxn *txn) 309{ 310 311 Dbc *cursorp = NULL; 312 int count = 0; 313 314 try { 315 // Get the cursor 316 dbp->cursor(txn, &cursorp, 0); 317 318 Dbt key, value; 319 while (cursorp->get(&key, &value, DB_NEXT) == 0) { 320 count++; 321 } 322 } catch (DbDeadlockException &de) { 323 std::cerr << "countRecords: got deadlock" << std::endl; 324 cursorp->close(); 325 throw de; 326 } catch (DbException &e) { 327 std::cerr << "countRecords error:" << std::endl; 328 std::cerr << e.what() << std::endl; 329 } 330 331 if (cursorp != NULL) { 332 try { 333 cursorp->close(); 334 } catch (DbException &e) { 335 std::cerr << "countRecords: cursor close failed:" << std::endl; 336 std::cerr << e.what() << std::endl; 337 } 338 } 339 340 return (count); 341} 342 343 344// Open a Berkeley DB database 345int 346openDb(Db **dbpp, const char *progname, const char *fileName, 347 DbEnv *envp, u_int32_t extraFlags) 348{ 349 int ret; 350 u_int32_t openFlags; 351 352 try { 353 Db *dbp = new Db(envp, 0); 354 355 // Point to the new'd Db 356 *dbpp = dbp; 357 358 if (extraFlags != 0) 359 ret = dbp->set_flags(extraFlags); 360 361 // Now open the database */ 362 openFlags = DB_CREATE | // Allow database creation 363 DB_THREAD | 364 DB_AUTO_COMMIT; // Allow autocommit 365 366 dbp->open(NULL, // Txn pointer 367 fileName, // File name 368 NULL, // Logical db name 369 DB_BTREE, // Database type (using btree) 370 openFlags, // Open flags 371 0); // File mode. Using defaults 372 } catch (DbException &e) { 373 std::cerr << progname << ": openDb: db open failed:" << std::endl; 374 std::cerr << e.what() << std::endl; 375 return (EXIT_FAILURE); 376 } 377 378 return (EXIT_SUCCESS); 379} 380 381