1package db.txn;
2
3import com.sleepycat.bind.EntryBinding;
4import com.sleepycat.bind.serial.StoredClassCatalog;
5import com.sleepycat.bind.serial.SerialBinding;
6import com.sleepycat.bind.tuple.StringBinding;
7
8import com.sleepycat.db.Cursor;
9import com.sleepycat.db.CursorConfig;
10import com.sleepycat.db.Database;
11import com.sleepycat.db.DatabaseEntry;
12import com.sleepycat.db.DatabaseException;
13import com.sleepycat.db.DeadlockException;
14import com.sleepycat.db.Environment;
15import com.sleepycat.db.LockMode;
16import com.sleepycat.db.OperationStatus;
17import com.sleepycat.db.Transaction;
18
19import java.io.UnsupportedEncodingException;
20import java.util.Random;
21
22public class DBWriter extends Thread
23{
24    private Database myDb = null;
25    private Environment myEnv = null;
26    private EntryBinding dataBinding = null;
27    private Random generator = new Random();
28    private boolean passTxn = false;
29
30
31    private static final int MAX_RETRY = 20;
32
33    private static String[] keys = {"key 1", "key 2", "key 3",
34                                    "key 4", "key 5", "key 6",
35                                    "key 7", "key 8", "key 9",
36                                    "key 10"};
37
38
39    // Constructor. Get our DB handles from here
40    // This consturctor allows us to indicate whether the
41    // txn handle should be handed to countRecords()
42    DBWriter(Environment env, Database db, StoredClassCatalog scc,
43        boolean passtxn)
44
45        throws DatabaseException {
46        myDb = db;
47        myEnv = env;
48        dataBinding = new SerialBinding(scc, PayloadData.class);
49
50        passTxn = passtxn;
51    }
52
53    // Constructor. Get our DB handles from here
54    DBWriter(Environment env, Database db, StoredClassCatalog scc)
55
56        throws DatabaseException {
57        myDb = db;
58        myEnv = env;
59        dataBinding = new SerialBinding(scc, PayloadData.class);
60    }
61
62    // Thread method that writes a series of records
63    // to the database using transaction protection.
64    // Deadlock handling is demonstrated here.
65    public void run () {
66        Transaction txn = null;
67
68        // Perform 50 transactions
69        for (int i=0; i<50; i++) {
70
71           boolean retry = true;
72           int retry_count = 0;
73           // while loop is used for deadlock retries
74           while (retry) {
75                // try block used for deadlock detection and
76                // general db exception handling
77                try {
78
79                    // Get a transaction
80                    txn = myEnv.beginTransaction(null, null);
81
82                    // Write 10 records to the db
83                    // for each transaction
84                    for (int j = 0; j < 10; j++) {
85                        // Get the key
86                        DatabaseEntry key = new DatabaseEntry();
87                        StringBinding.stringToEntry(keys[j], key);
88
89                        // Get the data
90                        PayloadData pd = new PayloadData(i+j, getName(),
91                            generator.nextDouble());
92                        DatabaseEntry data = new DatabaseEntry();
93                        dataBinding.objectToEntry(pd, data);
94
95                        // Do the put
96                        myDb.put(txn, key, data);
97                    }
98
99                    // commit
100                    System.out.println(getName() + " : committing txn : " + i);
101
102                    // This code block allows us to decide if txn handle is
103                    // passed to countRecords()
104                    //
105                    // TxnGuideInMemory requires a txn handle be handed to
106                    // countRecords(). The code self deadlocks if you don't.
107                    // TxnGuide has no such requirement because it supports
108                    // uncommitted reads.
109                    Transaction txnHandle = null;
110                    if (passTxn) { txnHandle = txn; }
111
112                    System.out.println(getName() + " : Found " +
113                        countRecords(txnHandle) + " records in the database.");
114                    try {
115                        txn.commit();
116                        txn = null;
117                    } catch (DatabaseException e) {
118                        System.err.println("Error on txn commit: " +
119                            e.toString());
120                    }
121                    retry = false;
122
123                } catch (DeadlockException de) {
124                    System.out.println("################# " + getName() +
125                        " : caught deadlock");
126                    // retry if necessary
127                    if (retry_count < MAX_RETRY) {
128                        System.err.println(getName() +
129                            " : Retrying operation.");
130                        retry = true;
131                        retry_count++;
132                    } else {
133                        System.err.println(getName() +
134                            " : out of retries. Giving up.");
135                        retry = false;
136                    }
137                } catch (DatabaseException e) {
138                    // abort and don't retry
139                    retry = false;
140                    System.err.println(getName() +
141                        " : caught exception: " + e.toString());
142                    System.err.println(getName() +
143                        " : errno: " + e.getErrno());
144                    e.printStackTrace();
145                } finally {
146                    if (txn != null) {
147                        try {
148                            txn.abort();
149                        } catch (Exception e) {
150                            System.err.println("Error aborting transaction: " +
151                                e.toString());
152                            e.printStackTrace();
153                        }
154                    }
155                }
156            }
157        }
158    }
159
160    // This simply counts the number of records contained in the
161    // database and returns the result. You can use this method
162    // in three ways:
163    //
164    // First call it with an active txn handle.
165    //
166    // Secondly, configure the cursor for dirty reads
167    //
168    // Third, call countRecords AFTER the writer has committed
169    //    its transaction.
170    //
171    // If you do none of these things, the writer thread will
172    // self-deadlock.
173    //
174    // Note that this method exists only for illustrative purposes.
175    // A more straight-forward way to count the number of records in
176    // a database is to use the Database.getStats() method.
177    private int countRecords(Transaction txn)  throws DatabaseException {
178        DatabaseEntry key = new DatabaseEntry();
179        DatabaseEntry data = new DatabaseEntry();
180        int count = 0;
181        Cursor cursor = null;
182
183        try {
184            // Get the cursor
185            CursorConfig cc = new CursorConfig();
186            // setReadUncommitted is ignored if the database was not
187            // opened for uncommitted read support. TxnGuide opens
188            // its database in this way, TxnGuideInMemory does not.
189            cc.setReadUncommitted(true);
190            cursor = myDb.openCursor(txn, cc);
191            while (cursor.getNext(key, data, LockMode.DEFAULT) ==
192                    OperationStatus.SUCCESS) {
193
194                    count++;
195            }
196        } finally {
197            if (cursor != null) {
198                cursor.close();
199            }
200        }
201
202        return count;
203
204    }
205}
206