1/*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2008-2009 Oracle. All rights reserved. 5 * 6 * $Id$ 7 */ 8 9// File txn_guide_stl.cpp 10#include <iostream> 11#include <db_cxx.h> 12 13#include "dbstl_map.h" 14 15#ifdef _WIN32 16#include <windows.h> 17extern "C" { 18 extern int _tgetopt(int nargc, TCHAR* const* nargv, const TCHAR * ostr); 19 extern TCHAR *optarg; 20} 21#define PATHD '\\' 22 23typedef HANDLE thread_t; 24#define thread_create(thrp, attr, func, arg) \ 25 (((*(thrp) = CreateThread(NULL, 0, \ 26 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) 27#define thread_join(thr, statusp) \ 28 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ 29 ((statusp == NULL) ? 0 : \ 30 (GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1))) 31 32typedef HANDLE mutex_t; 33#define mutex_init(m, attr) \ 34 (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1) 35#define mutex_lock(m) \ 36 ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1) 37#define mutex_unlock(m) (ReleaseMutex(*(m)) ? 0 : -1) 38#else 39#include <pthread.h> 40#include <unistd.h> 41#define PATHD '/' 42 43typedef pthread_t thread_t; 44#define thread_create(thrp, attr, func, arg) \ 45 pthread_create((thrp), (attr), (func), (arg)) 46#define thread_join(thr, statusp) pthread_join((thr), (statusp)) 47 48typedef pthread_mutex_t mutex_t; 49#define mutex_init(m, attr) pthread_mutex_init((m), (attr)) 50#define mutex_lock(m) pthread_mutex_lock(m) 51#define mutex_unlock(m) pthread_mutex_unlock(m) 52#endif 53 54// Run 5 writers threads at a time. 55#define NUMWRITERS 5 56using namespace dbstl; 57typedef db_multimap<const char *, int, ElementHolder<int> > strmap_t; 58// Printing of thread_t is implementation-specific, so we 59// create our own thread IDs for reporting purposes. 60int global_thread_num; 61mutex_t thread_num_lock; 62 63// Forward declarations 64int countRecords(strmap_t *); 65int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t); 66int usage(void); 67void *writerThread(void *); 68 69// Usage function 70int 71usage() 72{ 73 std::cerr << " [-h <database_home_directory>] [-m (in memory use)]" 74 << std::endl; 75 return (EXIT_FAILURE); 76} 77 78int 79main(int argc, char *argv[]) 80{ 81 // Initialize our handles 82 Db *dbp = NULL; 83 DbEnv *envp = NULL; 84 85 thread_t writerThreads[NUMWRITERS]; 86 int i, inmem; 87 u_int32_t envFlags; 88 const char *dbHomeDir; 89 90 inmem = 0; 91 // Application name 92 const char *progName = "TxnGuideStl"; 93 94 // Database file name 95 const char *fileName = "mydb.db"; 96 97 // Parse the command line arguments 98#ifdef _WIN32 99 dbHomeDir = ".\\TESTDIR"; 100#else 101 dbHomeDir = "./TESTDIR"; 102#endif 103 104 // Env open flags 105 envFlags = 106 DB_CREATE | // Create the environment if it does not exist 107 DB_RECOVER | // Run normal recovery. 108 DB_INIT_LOCK | // Initialize the locking subsystem 109 DB_INIT_LOG | // Initialize the logging subsystem 110 DB_INIT_TXN | // Initialize the transactional subsystem. This 111 // also turns on logging. 112 DB_INIT_MPOOL | // Initialize the memory pool (in-memory cache) 113 DB_THREAD; // Cause the environment to be free-threaded 114 115 try { 116 // Create and open the environment 117 envp = new DbEnv(DB_CXX_NO_EXCEPTIONS); 118 119 // Indicate that we want db to internally perform deadlock 120 // detection. Also indicate that the transaction with 121 // the fewest number of write locks will receive the 122 // deadlock notification in the event of a deadlock. 123 envp->set_lk_detect(DB_LOCK_MINWRITE); 124 125 if (inmem) { 126 envp->set_lg_bsize(64 * 1024 * 1024); 127 envp->open(NULL, envFlags, 0644); 128 fileName = NULL; 129 } else 130 envp->open(dbHomeDir, envFlags, 0644); 131 132 // If we had utility threads (for running checkpoints or 133 // deadlock detection, for example) we would spawn those 134 // here. However, for a simple example such as this, 135 // that is not required. 136 137 // Open the database 138 openDb(&dbp, progName, fileName, 139 envp, DB_DUP); 140 141 // Call this function before any use of dbstl in a single thread 142 // if multiple threads are using dbstl. 143 dbstl::dbstl_startup(); 144 145 // We created the dbp and envp handles not via dbstl::open_db/open_env 146 // functions, so we must register the handles in each thread using the 147 // container. 148 dbstl::register_db(dbp); 149 dbstl::register_db_env(envp); 150 151 strmap_t *strmap = new strmap_t(dbp, envp); 152 // Initialize a mutex. Used to help provide thread ids. 153 (void)mutex_init(&thread_num_lock, NULL); 154 155 // Start the writer threads. 156 for (i = 0; i < NUMWRITERS; i++) 157 (void)thread_create(&writerThreads[i], NULL, 158 writerThread, (void *)strmap); 159 160 161 // Join the writers 162 for (i = 0; i < NUMWRITERS; i++) 163 (void)thread_join(writerThreads[i], NULL); 164 165 delete strmap; 166 167 } catch(DbException &e) { 168 std::cerr << "Error opening database environment: " 169 << (inmem ? "NULL" : dbHomeDir) << std::endl; 170 std::cerr << e.what() << std::endl; 171 dbstl_exit(); 172 return (EXIT_FAILURE); 173 } 174 175 // Environment and database will be automatically closed by dbstl. 176 177 // Final status message and return. 178 179 std::cout << "I'm all done." << std::endl; 180 181 dbstl_exit(); 182 delete envp; 183 return (EXIT_SUCCESS); 184} 185 186// A function that performs a series of writes to a 187// Berkeley DB database. The information written 188// to the database is largely nonsensical, but the 189// mechanism of transactional commit/abort and 190// deadlock detection is illustrated here. 191void * 192writerThread(void *args) 193{ 194 int j, thread_num; 195 int max_retries = 1; // Max retry on a deadlock 196 const char *key_strings[] = {"key 1", "key 2", "key 3", "key 4", 197 "key 5", "key 6", "key 7", "key 8", 198 "key 9", "key 10"}; 199 200 strmap_t *strmap = (strmap_t *)args; 201 DbEnv *envp = strmap->get_db_env_handle(); 202 203 // We created the dbp and envp handles not via dbstl::open_db/open_env 204 // functions, so we must register the handles in each thread using the 205 // container. 206 dbstl::register_db(strmap->get_db_handle()); 207 dbstl::register_db_env(envp); 208 209 // Get the thread number 210 (void)mutex_lock(&thread_num_lock); 211 global_thread_num++; 212 thread_num = global_thread_num; 213 (void)mutex_unlock(&thread_num_lock); 214 215 // Initialize the random number generator 216 srand(thread_num); 217 218 // Perform 50 transactions 219 for (int i = 0; i < 1; i++) { 220 DbTxn *txn; 221 int retry = 100; 222 int retry_count = 0, payload; 223 // while loop is used for deadlock retries 224 while (retry--) { 225 // try block used for deadlock detection and 226 // general db exception handling 227 try { 228 229 // Begin our transaction. We group multiple writes in 230 // this thread under a single transaction so as to 231 // (1) show that you can atomically perform multiple 232 // writes at a time, and (2) to increase the chances 233 // of a deadlock occurring so that we can observe our 234 // deadlock detection at work. 235 236 // Normally we would want to avoid the potential for 237 // deadlocks, so for this workload the correct thing 238 // would be to perform our puts with autocommit. But 239 // that would excessively simplify our example, so we 240 // do the "wrong" thing here instead. 241 txn = dbstl::begin_txn(0, envp); 242 243 // Perform the database write for this transaction. 244 for (j = 0; j < 10; j++) { 245 payload = rand() + i; 246 strmap->insert(make_pair(key_strings[j], payload)); 247 } 248 249 // countRecords runs a cursor over the entire database. 250 // We do this to illustrate issues of deadlocking 251 std::cout << thread_num << " : Found " 252 << countRecords(strmap) 253 << " records in the database." << std::endl; 254 255 std::cout << thread_num << " : committing txn : " << i 256 << std::endl; 257 258 // commit 259 try { 260 dbstl::commit_txn(envp); 261 } catch (DbException &e) { 262 std::cout << "Error on txn commit: " 263 << e.what() << std::endl; 264 } 265 } catch (DbDeadlockException &) { 266 // First thing that we MUST do is abort the transaction. 267 try { 268 dbstl::abort_txn(envp); 269 } catch (DbException ex1) { 270 std::cout<<ex1.what(); 271 } 272 273 // Now we decide if we want to retry the operation. 274 // If we have retried less than max_retries, 275 // increment the retry count and goto retry. 276 if (retry_count < max_retries) { 277 std::cout << "############### Writer " << thread_num 278 << ": Got DB_LOCK_DEADLOCK.\n" 279 << "Retrying write operation." 280 << std::endl; 281 retry_count++; 282 283 } else { 284 // Otherwise, just give up. 285 std::cerr << "Writer " << thread_num 286 << ": Got DeadLockException and out of " 287 << "retries. Giving up." << std::endl; 288 retry = 0; 289 } 290 } catch (DbException &e) { 291 std::cerr << "db_map<> storage failed" << std::endl; 292 std::cerr << e.what() << std::endl; 293 dbstl::abort_txn(envp); 294 retry = 0; 295 } catch (std::exception &ee) { 296 std::cerr << "Unknown exception: " << ee.what() << std::endl; 297 return (0); 298 } 299 } 300 } 301 return (0); 302} 303 304 305// This simply counts the number of records contained in the 306// database and returns the result. 307// 308// Note that this method exists only for illustrative purposes. 309// A more straight-forward way to count the number of records in 310// a database is to use the db_map<>::size() method. 311int 312countRecords(strmap_t *strmap) 313{ 314 315 int count = 0; 316 strmap_t::iterator itr; 317 try { 318 // Set the flag used by Db::cursor. 319 for (itr = strmap->begin(); itr != strmap->end(); ++itr) 320 count++; 321 } catch (DbDeadlockException &de) { 322 std::cerr << "countRecords: got deadlock" << std::endl; 323 // itr's cursor will be automatically closed when it is destructed. 324 throw de; 325 } catch (DbException &e) { 326 std::cerr << "countRecords error:" << std::endl; 327 std::cerr << e.what() << std::endl; 328 } 329 330 // itr's cursor will be automatically closed when it is destructed. 331 332 return (count); 333} 334 335 336// Open a Berkeley DB database 337int 338openDb(Db **dbpp, const char *progname, const char *fileName, 339 DbEnv *envp, u_int32_t extraFlags) 340{ 341 int ret; 342 u_int32_t openFlags; 343 344 try { 345 Db *dbp = new Db(envp, DB_CXX_NO_EXCEPTIONS); 346 347 // Point to the new'd Db. 348 *dbpp = dbp; 349 350 if (extraFlags != 0) 351 ret = dbp->set_flags(extraFlags); 352 353 // Now open the database. 354 openFlags = DB_CREATE | // Allow database creation 355 DB_READ_UNCOMMITTED | // Allow uncommitted reads 356 DB_AUTO_COMMIT; // Allow autocommit 357 358 dbp->open(NULL, // Txn pointer 359 fileName, // File name 360 NULL, // Logical db name 361 DB_BTREE, // Database type (using btree) 362 openFlags, // Open flags 363 0); // File mode. Using defaults 364 } catch (DbException &e) { 365 std::cerr << progname << ": openDb: db open failed:" << std::endl; 366 std::cerr << e.what() << std::endl; 367 return (EXIT_FAILURE); 368 } 369 370 return (EXIT_SUCCESS); 371} 372 373