1package persist.txn; 2 3import com.sleepycat.db.CursorConfig; 4import com.sleepycat.db.DatabaseException; 5import com.sleepycat.db.DeadlockException; 6import com.sleepycat.db.Environment; 7import com.sleepycat.db.Transaction; 8 9import com.sleepycat.persist.EntityCursor; 10import com.sleepycat.persist.EntityStore; 11import com.sleepycat.persist.PrimaryIndex; 12 13import java.util.Iterator; 14import java.util.Random; 15import java.io.UnsupportedEncodingException; 16 17public class StoreWriter extends Thread 18{ 19 private EntityStore myStore = null; 20 private Environment myEnv = null; 21 private PrimaryIndex<Integer,PayloadDataEntity> pdKey; 22 private Random generator = new Random(); 23 private boolean passTxn = false; 24 25 26 private static final int MAX_RETRY = 20; 27 28 // Constructor. Get our handles from here 29 StoreWriter(Environment env, EntityStore store) 30 31 throws DatabaseException { 32 myStore = store; 33 myEnv = env; 34 35 // Open the data accessor. This is used to store persistent 36 // objects. 37 pdKey = myStore.getPrimaryIndex(Integer.class, 38 PayloadDataEntity.class); 39 } 40 41 // Thread method that writes a series of objects 42 // to the store using transaction protection. 43 // Deadlock handling is demonstrated here. 44 public void run () { 45 Transaction txn = null; 46 47 // Perform 50 transactions 48 for (int i=0; i<50; i++) { 49 50 boolean retry = true; 51 int retry_count = 0; 52 // while loop is used for deadlock retries 53 while (retry) { 54 // try block used for deadlock detection and 55 // general exception handling 56 try { 57 58 // Get a transaction 59 txn = myEnv.beginTransaction(null, null); 60 61 // Write 10 PayloadDataEntity objects to the 62 // store for each transaction 63 for (int j = 0; j < 10; j++) { 64 // Instantiate an object 65 PayloadDataEntity pd = new PayloadDataEntity(); 66 67 // Set the Object ID. This is used as the primary key. 68 pd.setID(i + j); 69 70 // The thread name is used as a secondary key, and 71 // it is retrieved by this class's getName() method. 72 pd.setThreadName(getName()); 73 74 // The last bit of data that we use is a double 75 // that we generate randomly. This data is not 76 // indexed. 77 pd.setDoubleData(generator.nextDouble()); 78 79 // Do the put 80 pdKey.put(txn, pd); 81 } 82 83 // commit 84 System.out.println(getName() + " : committing txn : " + i); 85 System.out.println(getName() + " : Found " + 86 countObjects(txn) + " objects in the store."); 87 try { 88 txn.commit(); 89 txn = null; 90 } catch (DatabaseException e) { 91 System.err.println("Error on txn commit: " + 92 e.toString()); 93 } 94 retry = false; 95 96 } catch (DeadlockException de) { 97 System.out.println("################# " + getName() + 98 " : caught deadlock"); 99 // retry if necessary 100 if (retry_count < MAX_RETRY) { 101 System.err.println(getName() + 102 " : Retrying operation."); 103 retry = true; 104 retry_count++; 105 } else { 106 System.err.println(getName() + 107 " : out of retries. Giving up."); 108 retry = false; 109 } 110 } catch (DatabaseException e) { 111 // abort and don't retry 112 retry = false; 113 System.err.println(getName() + 114 " : caught exception: " + e.toString()); 115 System.err.println(getName() + 116 " : errno: " + e.getErrno()); 117 e.printStackTrace(); 118 } finally { 119 if (txn != null) { 120 try { 121 txn.abort(); 122 } catch (Exception e) { 123 System.err.println("Error aborting transaction: " + 124 e.toString()); 125 e.printStackTrace(); 126 } 127 } 128 } 129 } 130 } 131 } 132 133 // This simply counts the number of objects contained in the 134 // store and returns the result. You can use this method 135 // in three ways: 136 // 137 // First call it with an active txn handle. 138 // 139 // Secondly, configure the cursor for dirty reads 140 // 141 // Third, call countObjects AFTER the writer has committed 142 // its transaction. 143 // 144 // If you do none of these things, the writer thread will 145 // self-deadlock. 146 private int countObjects(Transaction txn) throws DatabaseException { 147 int count = 0; 148 149 CursorConfig cc = new CursorConfig(); 150 // This is ignored if the store is not opened with uncommitted read 151 // support. 152 cc.setReadUncommitted(true); 153 EntityCursor<PayloadDataEntity> cursor = pdKey.entities(txn, cc); 154 155 try { 156 for (PayloadDataEntity pdi : cursor) { 157 count++; 158 } 159 } finally { 160 if (cursor != null) { 161 cursor.close(); 162 } 163 } 164 165 return count; 166 167 } 168} 169