1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2008-2009 Oracle.  All rights reserved.
5 *
6 * $Id$
7 */
8
9// File txn_guide_stl.cpp
10#include <iostream>
11#include <db_cxx.h>
12
13#include "dbstl_map.h"
14
15#ifdef _WIN32
16#include <windows.h>
17extern "C" {
18    extern int _tgetopt(int nargc, TCHAR* const* nargv, const TCHAR * ostr);
19    extern TCHAR *optarg;
20}
21#define PATHD '\\'
22
23typedef HANDLE thread_t;
24#define thread_create(thrp, attr, func, arg)                               \
25    (((*(thrp) = CreateThread(NULL, 0,                                     \
26        (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
27#define	thread_join(thr, statusp)                                          \
28    ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&            \
29    ((statusp == NULL) ? 0 : 						   \
30    (GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)))
31
32typedef HANDLE mutex_t;
33#define mutex_init(m, attr)                                                \
34    (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
35#define mutex_lock(m)                                                      \
36    ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
37#define mutex_unlock(m)         (ReleaseMutex(*(m)) ? 0 : -1)
38#else
39#include <pthread.h>
40#include <unistd.h>
41#define PATHD '/'
42
43typedef pthread_t thread_t;
44#define thread_create(thrp, attr, func, arg)                               \
45    pthread_create((thrp), (attr), (func), (arg))
46#define thread_join(thr, statusp) pthread_join((thr), (statusp))
47
48typedef pthread_mutex_t mutex_t;
49#define mutex_init(m, attr)     pthread_mutex_init((m), (attr))
50#define mutex_lock(m)           pthread_mutex_lock(m)
51#define mutex_unlock(m)         pthread_mutex_unlock(m)
52#endif
53
54// Run 5 writers threads at a time.
55#define NUMWRITERS 5
56using namespace dbstl;
57typedef db_multimap<const char *, int, ElementHolder<int> > strmap_t;
58// Printing of thread_t is implementation-specific, so we
59// create our own thread IDs for reporting purposes.
60int global_thread_num;
61mutex_t thread_num_lock;
62
63// Forward declarations
64int countRecords(strmap_t *);
65int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t);
66int usage(void);
67void *writerThread(void *);
68
69// Usage function
70int
71usage()
72{
73    std::cerr << " [-h <database_home_directory>] [-m (in memory use)]"
74              << std::endl;
75    return (EXIT_FAILURE);
76}
77
78int
79main(int argc, char *argv[])
80{
81    // Initialize our handles
82    Db *dbp = NULL;
83    DbEnv *envp = NULL;
84
85    thread_t writerThreads[NUMWRITERS];
86    int i, inmem;
87    u_int32_t envFlags;
88    const char *dbHomeDir;
89
90    inmem = 0;
91    // Application name
92    const char *progName = "TxnGuideStl";
93
94    // Database file name
95    const char *fileName = "mydb.db";
96
97    // Parse the command line arguments
98#ifdef _WIN32
99    dbHomeDir = ".\\TESTDIR";
100#else
101    dbHomeDir = "./TESTDIR";
102#endif
103
104    // Env open flags
105    envFlags =
106      DB_CREATE     |  // Create the environment if it does not exist
107      DB_RECOVER    |  // Run normal recovery.
108      DB_INIT_LOCK  |  // Initialize the locking subsystem
109      DB_INIT_LOG   |  // Initialize the logging subsystem
110      DB_INIT_TXN   |  // Initialize the transactional subsystem. This
111                       // also turns on logging.
112      DB_INIT_MPOOL |  // Initialize the memory pool (in-memory cache)
113      DB_THREAD;       // Cause the environment to be free-threaded
114
115    try {
116        // Create and open the environment
117        envp = new DbEnv(DB_CXX_NO_EXCEPTIONS);
118
119        // Indicate that we want db to internally perform deadlock
120        // detection.  Also indicate that the transaction with
121        // the fewest number of write locks will receive the
122        // deadlock notification in the event of a deadlock.
123        envp->set_lk_detect(DB_LOCK_MINWRITE);
124
125        if (inmem) {
126            envp->set_lg_bsize(64 * 1024 * 1024);
127            envp->open(NULL, envFlags, 0644);
128            fileName = NULL;
129        } else
130            envp->open(dbHomeDir, envFlags, 0644);
131
132        // If we had utility threads (for running checkpoints or
133        // deadlock detection, for example) we would spawn those
134        // here. However, for a simple example such as this,
135        // that is not required.
136
137        // Open the database
138        openDb(&dbp, progName, fileName,
139            envp, DB_DUP);
140
141        // Call this function before any use of dbstl in a single thread
142        // if multiple threads are using dbstl.
143        dbstl::dbstl_startup();
144
145        // We created the dbp and envp handles not via dbstl::open_db/open_env
146        // functions, so we must register the handles in each thread using the
147        // container.
148        dbstl::register_db(dbp);
149        dbstl::register_db_env(envp);
150
151        strmap_t *strmap = new strmap_t(dbp, envp);
152        // Initialize a mutex. Used to help provide thread ids.
153        (void)mutex_init(&thread_num_lock, NULL);
154
155        // Start the writer threads.
156        for (i = 0; i < NUMWRITERS; i++)
157            (void)thread_create(&writerThreads[i], NULL,
158                writerThread, (void *)strmap);
159
160
161        // Join the writers
162        for (i = 0; i < NUMWRITERS; i++)
163            (void)thread_join(writerThreads[i], NULL);
164
165        delete strmap;
166
167    } catch(DbException &e) {
168        std::cerr << "Error opening database environment: "
169                  << (inmem ? "NULL" : dbHomeDir) << std::endl;
170        std::cerr << e.what() << std::endl;
171        dbstl_exit();
172        return (EXIT_FAILURE);
173    }
174
175    // Environment and database will be automatically closed by dbstl.
176
177    // Final status message and return.
178
179    std::cout << "I'm all done." << std::endl;
180
181    dbstl_exit();
182    delete envp;
183    return (EXIT_SUCCESS);
184}
185
186// A function that performs a series of writes to a
187// Berkeley DB database. The information written
188// to the database is largely nonsensical, but the
189// mechanism of transactional commit/abort and
190// deadlock detection is illustrated here.
191void *
192writerThread(void *args)
193{
194    int j, thread_num;
195    int max_retries = 1;   // Max retry on a deadlock
196    const char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
197                           "key 5", "key 6", "key 7", "key 8",
198                           "key 9", "key 10"};
199
200    strmap_t *strmap = (strmap_t *)args;
201    DbEnv *envp = strmap->get_db_env_handle();
202
203    // We created the dbp and envp handles not via dbstl::open_db/open_env
204    // functions, so we must register the handles in each thread using the
205    // container.
206    dbstl::register_db(strmap->get_db_handle());
207    dbstl::register_db_env(envp);
208
209    // Get the thread number
210    (void)mutex_lock(&thread_num_lock);
211    global_thread_num++;
212    thread_num = global_thread_num;
213    (void)mutex_unlock(&thread_num_lock);
214
215    // Initialize the random number generator
216    srand(thread_num);
217
218    // Perform 50 transactions
219    for (int i = 0; i < 1; i++) {
220        DbTxn *txn;
221        int retry = 100;
222        int retry_count = 0, payload;
223        // while loop is used for deadlock retries
224        while (retry--) {
225            // try block used for deadlock detection and
226            // general db exception handling
227            try {
228
229                // Begin our transaction. We group multiple writes in
230                // this thread under a single transaction so as to
231                // (1) show that you can atomically perform multiple
232                // writes at a time, and (2) to increase the chances
233                // of a deadlock occurring so that we can observe our
234                // deadlock detection at work.
235
236                // Normally we would want to avoid the potential for
237                // deadlocks, so for this workload the correct thing
238                // would be to perform our puts with autocommit. But
239                // that would excessively simplify our example, so we
240                // do the "wrong" thing here instead.
241                txn = dbstl::begin_txn(0, envp);
242
243                // Perform the database write for this transaction.
244                for (j = 0; j < 10; j++) {
245                    payload = rand() + i;
246                    strmap->insert(make_pair(key_strings[j], payload));
247                }
248
249                // countRecords runs a cursor over the entire database.
250                // We do this to illustrate issues of deadlocking
251                std::cout << thread_num <<  " : Found "
252                          << countRecords(strmap)
253                          << " records in the database." << std::endl;
254
255                std::cout << thread_num <<  " : committing txn : " << i
256                          << std::endl;
257
258                // commit
259                try {
260                    dbstl::commit_txn(envp);
261                } catch (DbException &e) {
262                    std::cout << "Error on txn commit: "
263                              << e.what() << std::endl;
264                }
265            } catch (DbDeadlockException &) {
266                // First thing that we MUST do is abort the transaction.
267                try {
268                    dbstl::abort_txn(envp);
269                } catch (DbException ex1) {
270			std::cout<<ex1.what();
271                }
272
273                // Now we decide if we want to retry the operation.
274                // If we have retried less than max_retries,
275                // increment the retry count and goto retry.
276                if (retry_count < max_retries) {
277                    std::cout << "############### Writer " << thread_num
278                              << ": Got DB_LOCK_DEADLOCK.\n"
279                              << "Retrying write operation."
280                              << std::endl;
281                    retry_count++;
282
283                 } else {
284                    // Otherwise, just give up.
285                    std::cerr << "Writer " << thread_num
286                              << ": Got DeadLockException and out of "
287                              << "retries. Giving up." << std::endl;
288                    retry = 0;
289                 }
290           } catch (DbException &e) {
291                std::cerr << "db_map<> storage failed" << std::endl;
292                std::cerr << e.what() << std::endl;
293                dbstl::abort_txn(envp);
294                retry = 0;
295           } catch (std::exception &ee) {
296                std::cerr << "Unknown exception: " << ee.what() << std::endl;
297                return (0);
298          }
299        }
300    }
301    return (0);
302}
303
304
305// This simply counts the number of records contained in the
306// database and returns the result.
307//
308// Note that this method exists only for illustrative purposes.
309// A more straight-forward way to count the number of records in
310// a database is to use the db_map<>::size() method.
311int
312countRecords(strmap_t *strmap)
313{
314
315    int count = 0;
316    strmap_t::iterator itr;
317    try {
318        // Set the flag used by Db::cursor.
319        for (itr = strmap->begin(); itr != strmap->end(); ++itr)
320            count++;
321    } catch (DbDeadlockException &de) {
322        std::cerr << "countRecords: got deadlock" << std::endl;
323        // itr's cursor will be automatically closed when it is destructed.
324        throw de;
325    } catch (DbException &e) {
326        std::cerr << "countRecords error:" << std::endl;
327        std::cerr << e.what() << std::endl;
328    }
329
330    // itr's cursor will be automatically closed when it is destructed.
331
332    return (count);
333}
334
335
336// Open a Berkeley DB database
337int
338openDb(Db **dbpp, const char *progname, const char *fileName,
339  DbEnv *envp, u_int32_t extraFlags)
340{
341    int ret;
342    u_int32_t openFlags;
343
344    try {
345        Db *dbp = new Db(envp, DB_CXX_NO_EXCEPTIONS);
346
347        // Point to the new'd Db.
348        *dbpp = dbp;
349
350        if (extraFlags != 0)
351            ret = dbp->set_flags(extraFlags);
352
353        // Now open the database.
354        openFlags = DB_CREATE              | // Allow database creation
355                    DB_READ_UNCOMMITTED    | // Allow uncommitted reads
356                    DB_AUTO_COMMIT;          // Allow autocommit
357
358        dbp->open(NULL,       // Txn pointer
359                  fileName,   // File name
360                  NULL,       // Logical db name
361                  DB_BTREE,   // Database type (using btree)
362                  openFlags,  // Open flags
363                  0);         // File mode. Using defaults
364    } catch (DbException &e) {
365        std::cerr << progname << ": openDb: db open failed:" << std::endl;
366        std::cerr << e.what() << std::endl;
367        return (EXIT_FAILURE);
368    }
369
370    return (EXIT_SUCCESS);
371}
372
373