1// File TxnGuideInMemory.cpp
2
3#include <iostream>
4#include <db_cxx.h>
5
6#ifdef _WIN32
7#include <windows.h>
8#define PATHD '\\'
9extern "C" {
10    extern int getopt(int, char * const *, const char *);
11    extern char *optarg;
12}
13
14typedef HANDLE thread_t;
15#define thread_create(thrp, attr, func, arg)                               \
16    (((*(thrp) = CreateThread(NULL, 0,                                     \
17        (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
18#define thread_join(thr, statusp)                                          \
19    ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&            \
20    GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
21
22typedef HANDLE mutex_t;
23#define mutex_init(m, attr)                                                \
24    (((*(m) = CreateMutex(NULL, FALSE, NULL)) != NULL) ? 0 : -1)
25#define mutex_lock(m)                                                      \
26    ((WaitForSingleObject(*(m), INFINITE) == WAIT_OBJECT_0) ? 0 : -1)
27#define mutex_unlock(m)         (ReleaseMutex(*(m)) ? 0 : -1)
28#else
29#include <pthread.h>
30#include <unistd.h>
31#define PATHD '/'
32
33typedef pthread_t thread_t;
34#define thread_create(thrp, attr, func, arg)                               \
35    pthread_create((thrp), (attr), (func), (arg))
36#define thread_join(thr, statusp) pthread_join((thr), (statusp))
37
38typedef pthread_mutex_t mutex_t;
39#define mutex_init(m, attr)     pthread_mutex_init((m), (attr))
40#define mutex_lock(m)           pthread_mutex_lock(m)
41#define mutex_unlock(m)         pthread_mutex_unlock(m)
42#endif
43
44// Run 5 writers threads at a time.
45#define NUMWRITERS 5
46
47// Printing of pthread_t is implementation-specific, so we
48// create our own thread IDs for reporting purposes.
49int global_thread_num;
50mutex_t thread_num_lock;
51
52// Forward declarations
53int countRecords(Db *, DbTxn *);
54int openDb(Db **, const char *, const char *, DbEnv *, u_int32_t);
55int usage(void);
56void *writerThread(void *);
57
58int
59main(void)
60{
61    // Initialize our handles
62    Db *dbp = NULL;
63    DbEnv *envp = NULL;
64
65    thread_t writerThreads[NUMWRITERS];
66    int i;
67    u_int32_t envFlags;
68
69    // Application name
70    const char *progName = "TxnGuideInMemory";
71
72    // Env open flags
73    envFlags =
74      DB_CREATE     |  // Create the environment if it does not exist
75      DB_RECOVER    |  // Run normal recovery.
76      DB_INIT_LOCK  |  // Initialize the locking subsystem
77      DB_INIT_LOG   |  // Initialize the logging subsystem
78      DB_INIT_TXN   |  // Initialize the transactional subsystem. This
79                       // also turns on logging.
80      DB_INIT_MPOOL |  // Initialize the memory pool (in-memory cache)
81      DB_PRIVATE    |  // Region files are not backed by the filesystem.
82                       // Instead, they are backed by heap memory.
83      DB_THREAD;       // Cause the environment to be free-threaded
84
85    try {
86        // Create the environment
87        envp = new DbEnv(0);
88
89        // Specify in-memory logging
90        envp->log_set_config(DB_LOG_IN_MEMORY, 1);
91
92        // Specify the size of the in-memory log buffer.
93        envp->set_lg_bsize(10 * 1024 * 1024);
94
95        // Specify the size of the in-memory cache
96        envp->set_cachesize(0, 10 * 1024 * 1024, 1);
97
98        // Indicate that we want db to internally perform deadlock
99        // detection.  Also indicate that the transaction with
100        // the fewest number of write locks will receive the
101        // deadlock notification in the event of a deadlock.
102        envp->set_lk_detect(DB_LOCK_MINWRITE);
103
104        // Open the environment
105        envp->open(NULL, envFlags, 0);
106
107        // If we had utility threads (for running checkpoints or
108        // deadlock detection, for example) we would spawn those
109        // here. However, for a simple example such as this,
110        // that is not required.
111
112        // Open the database
113        openDb(&dbp, progName, NULL,
114            envp, DB_DUPSORT);
115
116        // Initialize a mutex. Used to help provide thread ids.
117        (void)mutex_init(&thread_num_lock, NULL);
118
119        // Start the writer threads.
120        for (i = 0; i < NUMWRITERS; i++)
121            (void)thread_create(
122                &writerThreads[i], NULL,
123                writerThread,
124                (void *)dbp);
125
126        // Join the writers
127        for (i = 0; i < NUMWRITERS; i++)
128            (void)thread_join(writerThreads[i], NULL);
129
130    } catch(DbException &e) {
131        std::cerr << "Error opening database environment: "
132                  << std::endl;
133        std::cerr << e.what() << std::endl;
134        return (EXIT_FAILURE);
135    }
136
137    try {
138        // Close our database handle if it was opened.
139        if (dbp != NULL)
140            dbp->close(0);
141
142        // Close our environment if it was opened.
143        if (envp != NULL)
144            envp->close(0);
145    } catch(DbException &e) {
146        std::cerr << "Error closing database and environment."
147                  << std::endl;
148        std::cerr << e.what() << std::endl;
149        return (EXIT_FAILURE);
150    }
151
152    // Final status message and return.
153
154    std::cout << "I'm all done." << std::endl;
155    return (EXIT_SUCCESS);
156}
157
158// A function that performs a series of writes to a
159// Berkeley DB database. The information written
160// to the database is largely nonsensical, but the
161// mechanism of transactional commit/abort and
162// deadlock detection is illustrated here.
163void *
164writerThread(void *args)
165{
166    int j, thread_num;
167    int max_retries = 20;   // Max retry on a deadlock
168    char *key_strings[] = {"key 1", "key 2", "key 3", "key 4",
169                           "key 5", "key 6", "key 7", "key 8",
170                           "key 9", "key 10"};
171
172    Db *dbp = (Db *)args;
173    DbEnv *envp = dbp->get_env();
174
175    // Get the thread number
176    (void)mutex_lock(&thread_num_lock);
177    global_thread_num++;
178    thread_num = global_thread_num;
179    (void)mutex_unlock(&thread_num_lock);
180
181    // Initialize the random number generator
182    srand(thread_num);
183
184    // Perform 50 transactions
185    for (int i=0; i<50; i++) {
186        DbTxn *txn;
187        bool retry = true;
188        int retry_count = 0;
189        // while loop is used for deadlock retries
190        while (retry) {
191            // try block used for deadlock detection and
192            // general db exception handling
193            try {
194
195                // Begin our transaction. We group multiple writes in
196                // this thread under a single transaction so as to
197                // (1) show that you can atomically perform multiple
198                // writes at a time, and (2) to increase the chances
199                // of a deadlock occurring so that we can observe our
200                // deadlock detection at work.
201
202                // Normally we would want to avoid the potential for
203                // deadlocks, so for this workload the correct thing
204                // would be to perform our puts with autocommit. But
205                // that would excessively simplify our example, so we
206                // do the "wrong" thing here instead.
207                txn = NULL;
208                envp->txn_begin(NULL, &txn, 0);
209
210                // Perform the database write for this transaction.
211                for (j = 0; j < 10; j++) {
212                    Dbt key, value;
213                    key.set_data(key_strings[j]);
214                    key.set_size((u_int32_t)strlen(key_strings[j]) + 1);
215
216                    int payload = rand() + i;
217                    value.set_data(&payload);
218                    value.set_size(sizeof(int));
219
220                    // Perform the database put
221                    dbp->put(txn, &key, &value, 0);
222                }
223
224                // countRecords runs a cursor over the entire database.
225                // We do this to illustrate issues of deadlocking
226                std::cout << thread_num <<  " : Found "
227                          <<  countRecords(dbp, txn)
228                          << " records in the database." << std::endl;
229
230                std::cout << thread_num <<  " : committing txn : " << i
231                          << std::endl;
232
233                // commit
234                try {
235                    txn->commit(0);
236                    retry = false;
237                    txn = NULL;
238                } catch (DbException &e) {
239                    std::cout << "Error on txn commit: "
240                              << e.what() << std::endl;
241                }
242            } catch (DbDeadlockException &) {
243                // First thing that we MUST do is abort the transaction.
244                if (txn != NULL)
245                    (void)txn->abort();
246
247                // Now we decide if we want to retry the operation.
248                // If we have retried less than max_retries,
249                // increment the retry count and goto retry.
250                if (retry_count < max_retries) {
251                    std::cerr << "############### Writer " << thread_num
252                              << ": Got DB_LOCK_DEADLOCK.\n"
253                              << "Retrying write operation." << std::endl;
254                    retry_count++;
255                    retry = true;
256                 } else {
257                    // Otherwise, just give up.
258                    std::cerr << "Writer " << thread_num
259                              << ": Got DeadLockException and out of "
260                              << "retries. Giving up." << std::endl;
261                    retry = false;
262                 }
263           } catch (DbException &e) {
264                std::cerr << "db put failed" << std::endl;
265                std::cerr << e.what() << std::endl;
266                if (txn != NULL)
267                    txn->abort();
268                retry = false;
269           } catch (std::exception &ee) {
270            std::cerr << "Unknown exception: " << ee.what() << std::endl;
271            return (0);
272          }
273        }
274    }
275    return (0);
276}
277
278
279// This simply counts the number of records contained in the
280// database and returns the result. You can use this method
281// in three ways:
282//
283// First call it with an active txn handle.
284//
285// Secondly, configure the cursor for uncommitted reads
286//
287// Third, call countRecords AFTER the writer has committed
288//    its transaction.
289//
290// If you do none of these things, the writer thread will
291// self-deadlock.
292//
293// Note that this method exists only for illustrative purposes.
294// A more straight-forward way to count the number of records in
295// a database is to use the Database.getStats() method.
296int
297countRecords(Db *dbp, DbTxn *txn)
298{
299
300    Dbc *cursorp = NULL;
301    int count = 0;
302
303    try {
304        // Get the cursor
305        dbp->cursor(txn, &cursorp, 0);
306
307        Dbt key, value;
308        while (cursorp->get(&key, &value, DB_NEXT) == 0) {
309            count++;
310        }
311    } catch (DbDeadlockException &de) {
312        std::cerr << "countRecords: got deadlock" << std::endl;
313        cursorp->close();
314        throw de;
315    } catch (DbException &e) {
316        std::cerr << "countRecords error:" << std::endl;
317        std::cerr << e.what() << std::endl;
318    }
319
320    if (cursorp != NULL) {
321        try {
322            cursorp->close();
323        } catch (DbException &e) {
324            std::cerr << "countRecords: cursor close failed:" << std::endl;
325            std::cerr << e.what() << std::endl;
326        }
327    }
328
329    return (count);
330}
331
332
333// Open a Berkeley DB database
334int
335openDb(Db **dbpp, const char *progname, const char *fileName,
336  DbEnv *envp, u_int32_t extraFlags)
337{
338    int ret;
339    u_int32_t openFlags;
340
341    try {
342        Db *dbp = new Db(envp, 0);
343
344        // Point to the new'd Db
345        *dbpp = dbp;
346
347        if (extraFlags != 0)
348            ret = dbp->set_flags(extraFlags);
349
350        // Now open the database */
351        openFlags = DB_CREATE        | // Allow database creation
352                    DB_THREAD        |
353                    DB_AUTO_COMMIT;    // Allow autocommit
354
355        dbp->open(NULL,       // Txn pointer
356                  fileName,   // File name
357                  NULL,       // Logical db name
358                  DB_BTREE,   // Database type (using btree)
359                  openFlags,  // Open flags
360                  0);         // File mode. Using defaults
361    } catch (DbException &e) {
362        std::cerr << progname << ": openDb: db open failed:" << std::endl;
363        std::cerr << e.what() << std::endl;
364        return (EXIT_FAILURE);
365    }
366
367    return (EXIT_SUCCESS);
368}
369
370