jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [02/11] jena git commit: Fix for snapshot-style promotion. With tests.
Date Wed, 17 Aug 2016 16:01:35 GMT
Fix for snapshot-style promotion. With tests.

Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/0142c3bc
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/0142c3bc
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/0142c3bc

Branch: refs/heads/master
Commit: 0142c3bce60808f6d7d35b482e204bb891e7d115
Parents: af5c292
Author: Andy Seaborne <andy@apache.org>
Authored: Fri Aug 12 14:24:42 2016 +0100
Committer: Andy Seaborne <andy@apache.org>
Committed: Sat Aug 13 15:08:01 2016 +0100

----------------------------------------------------------------------
 .../tdb/transaction/TransactionManager.java     |  55 ++--
 .../jena/tdb/transaction/TestTransPromote.java  | 307 ++++++++++++++-----
 2 files changed, 261 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/0142c3bc/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
index e4dcfae..1a65277 100644
--- a/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
+++ b/jena-tdb/src/main/java/org/apache/jena/tdb/transaction/TransactionManager.java
@@ -326,19 +326,34 @@ public class TransactionManager
         if ( txn.getState() != TxnState.ACTIVE )
             throw new TDBTransactionException("promote: transaction is not active") ;
         
-        DatasetGraphTDB basedsg = txn.getBaseDataset() ;
-        // if read commiter - pick up any currentReaderView (last commited transaction)
-        if ( ! readCommited ) {
-            // Compare by object identity.
-            if ( currentReaderView.get() != basedsg )
-                throw new TDBTransactionException("Dataset changed - can't promote") ;
-        }
+        if ( readCommited ) {
+            // Read commit - pick up whatever is current at the point setup.  
+            // Need to go through begin for the writers lock. 
+            DatasetGraphTxn dsgtxn2 = begin( ReadWrite.WRITE, txn.getLabel()) ;
+            return dsgtxn2 ;
+        }           
+        
+        // Don't promote if the database has moved on.
+        // 1/ No active writers.
+        //    Ideally, wiait to see if it aborts but abort is uncommon. 
+        // Easy implementation -- if any active writers, don't promote.
+        if ( activeWriters.get() > 0 )
+            throw new TDBTransactionException("Dataset may be changing - active writer -
can't promote") ;
+//            // Would this block corrctly? ... drops the sync lock?
+//            acquireWriterLock(true) ;
+
+        // 2/ Check the database view has not moved on.
+        DatasetGraphTDB current  = determineBaseDataset() ;
+        DatasetGraphTDB starting = txn.getBaseDataset() ;
+        // Compare by object identity.
+        if ( current != starting )
+            throw new TDBTransactionException("Dataset changed - can't promote") ;
         
         // Need to go through begin for the writers lock. 
         DatasetGraphTxn dsgtxn2 = begin( ReadWrite.WRITE, txn.getLabel()) ;
         return dsgtxn2 ;
     }
-
+    
     // If DatasetGraphTransaction has a sync lock on sConn, this
     // does not need to be sync'ed. But it's possible to use some
     // of the low level object directly so we'll play safe.  
@@ -355,16 +370,7 @@ public class TransactionManager
                 case WRITE : System.out.print("w") ; break ;
             }
         
-        DatasetGraphTDB dsg = baseDataset ;
-        // *** But, if there are pending, committed transactions, use latest.
-        if ( !commitedAwaitingFlush.isEmpty() ) {
-            if ( DEBUG )
-                System.out.print(commitedAwaitingFlush.size()) ;
-            dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size() - 1).getActiveDataset().getView()
;
-        } else {
-            if ( DEBUG )
-                System.out.print('_') ;
-        }
+        DatasetGraphTDB dsg = determineBaseDataset() ;
         Transaction txn = createTransaction(dsg, mode, label) ;
         
         log("begin$", txn) ;
@@ -389,6 +395,19 @@ public class TransactionManager
         return dsgTxn ;
     }
     
+    private DatasetGraphTDB determineBaseDataset() {
+    //      if ( DEBUG ) {
+    //          if ( !commitedAwaitingFlush.isEmpty() )
+    //              System.out.print(commitedAwaitingFlush.size()) ;
+    //      } else {
+    //          System.out.print('_') ;
+    //      }
+          DatasetGraphTDB dsg = baseDataset ;
+          // But, if there are pending, committed transactions, use latest.
+          if ( !commitedAwaitingFlush.isEmpty() )
+              dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size() - 1).getActiveDataset().getView()
;
+          return dsg ;
+      }
     private Transaction createTransaction(DatasetGraphTDB dsg, ReadWrite mode, String label)
{
         Transaction txn = new Transaction(dsg, mode, transactionId.getAndIncrement(), label,
this) ;
         return txn ;

http://git-wip-us.apache.org/repos/asf/jena/blob/0142c3bc/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
index e51cc29..f843e76 100644
--- a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
+++ b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
@@ -16,9 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.jena.tdb.transaction;
+package org.apache.jena.tdb.transaction ;
 
-import static org.junit.Assert.* ;
+import static org.junit.Assert.assertEquals ;
+import static org.junit.Assert.fail ;
 
 import java.util.concurrent.Semaphore ;
 import java.util.concurrent.atomic.AtomicInteger ;
@@ -30,158 +31,298 @@ import org.apache.jena.sparql.core.Quad ;
 import org.apache.jena.sparql.sse.SSE ;
 import org.apache.jena.system.ThreadTxn ;
 import org.apache.jena.system.Txn ;
+import org.apache.jena.tdb.TDB ;
 import org.apache.jena.tdb.TDBFactory ;
 import org.apache.jena.tdb.sys.SystemTDB ;
-import org.apache.jena.tdb.transaction.DatasetGraphTransaction ;
 import org.apache.log4j.Level ;
 import org.apache.log4j.Logger ;
-import org.junit.AfterClass ;
-import org.junit.BeforeClass ;
-import org.junit.Test ;
+import org.junit.* ;
 
-/** Tests for transactions that start read and then promote to write */ 
+/** Tests for transactions that start read and then promote to write */
 public class TestTransPromote {
 
     // Currently,
-    //    this feature is off and needs enabling via DatasetGraphTransaction.promotion
-    //    promotiion is implicit whe a write happens.  
-    
-    
-    
+    // this feature is off and needs enabling via DatasetGraphTransaction.promotion
+    // promotiion is implicit whe a write happens.
+
     // See beforeClass / afterClass.
-    
-    private static Logger logger = Logger.getLogger(SystemTDB.errlog.getName()) ;
-    private static Level  level ;
-    static boolean oldPromotion ;
-    
-    @BeforeClass static public void beforeClass() {
-        oldPromotion = DatasetGraphTransaction.promotion ;
-        DatasetGraphTransaction.promotion = true ;
-        level  = logger.getLevel() ;
-        //logger.setLevel(Level.ERROR) ;
+
+    private static Logger logger1 = Logger.getLogger(SystemTDB.errlog.getName()) ;
+    private static Level  level1 ;
+    private static Logger logger2 = Logger.getLogger(TDB.logInfoName) ;
+    private static Level  level2 ;
+    static boolean        stdPromotion ;
+    static boolean        stdReadCommitted ;
+
+    @BeforeClass
+    static public void beforeClass() {
+        stdPromotion = DatasetGraphTransaction.promotion ;
+        stdReadCommitted = DatasetGraphTransaction.readCommittedPromotion ;
+        level1 = logger1.getLevel() ;
+        level2 = logger2.getLevel() ;
+        
+        // logger1.setLevel(Level.ERROR) ;
+        // logger2.setLevel(Level.ERROR) ;
     }
-    
-    @AfterClass static public void afterClass() {
+
+    @AfterClass
+    static public void afterClass() {
         // Restore logging setting.
-        logger.setLevel(level); 
-        DatasetGraphTransaction.promotion = oldPromotion ;
+        logger2.setLevel(level2) ;
+        logger1.setLevel(level1) ;
+        DatasetGraphTransaction.promotion = stdPromotion ;
+        DatasetGraphTransaction.readCommittedPromotion = stdReadCommitted ;
+    }
+
+    @Before
+    public void before() {
+        DatasetGraphTransaction.promotion = true ;
+        DatasetGraphTransaction.readCommittedPromotion = true ;
+    }
+
+    @After
+    public void after() {
+        DatasetGraphTransaction.promotion = true ;
+        DatasetGraphTransaction.readCommittedPromotion = true ;
     }
     
+    
     private static Quad q1 = SSE.parseQuad("(_ :s :p1 1)") ;
     private static Quad q2 = SSE.parseQuad("(_ :s :p2 2)") ;
     private static Quad q3 = SSE.parseQuad("(_ :s :p3 3)") ;
-    
-    protected DatasetGraph create() { return TDBFactory.createDatasetGraph() ; } 
-    
+
+    protected DatasetGraph create() {
+        return TDBFactory.createDatasetGraph() ;
+    }
+
     protected static void assertCount(long expected, DatasetGraph dsg) {
-        dsg.begin(ReadWrite.READ);
+        dsg.begin(ReadWrite.READ) ;
         long x = Iter.count(dsg.find()) ;
         dsg.end() ;
         assertEquals(expected, x) ;
     }
+
+    // "strict" = don't see intermedioate changes.
+    // "readCommitted" = do see
+
+    // Subclass / parameterized
     
-    @Test public void promote_01() {
+    @Test public void promote_snapshot_01()         { run_01(false) ; }
+    @Test public void promote_readCommitted_01()    { run_01(true) ; }
+    
+    // READ-add
+    private void run_01(boolean b) {
+        DatasetGraphTransaction.readCommittedPromotion = b ;
         DatasetGraph dsg = create() ;
-        dsg.begin(ReadWrite.READ); 
+        
+        dsg.begin(ReadWrite.READ) ;
         dsg.add(q1) ;
-        dsg.commit();
+        dsg.commit() ;
         dsg.end() ;
     }
     
-    @Test public void promote_02() {
+    @Test public void promote_snapshot_02()         { run_02(false) ; }
+    @Test public void promote_readCommitted_02()    { run_02(true) ; }
+    
+    // Previous transaction then READ-add
+    private void run_02(boolean b) {
+        DatasetGraphTransaction.readCommittedPromotion = b ;
         DatasetGraph dsg = create() ;
-        dsg.begin(ReadWrite.READ); 
+        
+        dsg.begin(ReadWrite.READ) ;dsg.end() ;
+        
+        dsg.begin(ReadWrite.READ) ;
         dsg.add(q1) ;
-        dsg.add(q2) ;
-        dsg.commit();
+        dsg.commit() ;
         dsg.end() ;
-        assertCount(2, dsg) ;
     }
+    
+    @Test public void promote_snapshot_03()         { run_03(false) ; }
+    @Test public void promote_readCommitted_03()    { run_03(true) ; }
 
-    // Causes the warning.
-    @Test public void promote_03() {
+    private void run_03(boolean b) {
+        DatasetGraphTransaction.readCommittedPromotion = b ;
         DatasetGraph dsg = create() ;
-        dsg.begin(ReadWrite.READ); 
+        
+        dsg.begin(ReadWrite.WRITE) ;dsg.commit() ; dsg.end() ;
+        
+        dsg.begin(ReadWrite.READ) ;
         dsg.add(q1) ;
+        dsg.commit() ;
+        dsg.end() ;
+    }
+    
+    @Test public void promote_snapshot_04()         { run_04(false) ; }
+    @Test public void promote_readCommitted_04()    { run_04(true) ; }
+
+    private void run_04(boolean b) {
+        DatasetGraphTransaction.readCommittedPromotion = b ;
+        DatasetGraph dsg = create() ;
         
+        dsg.begin(ReadWrite.WRITE) ;dsg.abort() ; dsg.end() ;
+        
+        dsg.begin(ReadWrite.READ) ;
+        dsg.add(q1) ;
+        dsg.commit() ;
+        dsg.end() ;
+    }
+
+    @Test public void promote_snapshot_05()         { run_05(false) ; }
+    @Test public void promote_readCommitted_05()    { run_05(true) ; }
+    
+    private void run_05(boolean b) {
+        DatasetGraphTransaction.readCommittedPromotion = b ;
+        DatasetGraph dsg = create() ;
+        dsg.begin(ReadWrite.READ) ;
+        dsg.add(q1) ;
+
         // bad - forced abort.
         // Causes a WARN.
-        logger.setLevel(Level.ERROR) ;
+        logger1.setLevel(Level.ERROR) ;
         dsg.end() ;
-        logger.setLevel(level)  ;
-        
+        logger1.setLevel(level1) ;
+
         assertCount(0, dsg) ;
     }
+
+    @Test public void promote_snapshot_06()         { run_06(false) ; }
+    @Test public void promote_readCommitted_06()    { run_06(true) ; }
     
-    @Test public void promote_04() {
+    // Async writer after promotion.
+    private void run_06(boolean b) {
+        DatasetGraphTransaction.readCommittedPromotion = b ;
         DatasetGraph dsg = create() ;
         AtomicInteger a = new AtomicInteger(0) ;
-        
+
         Semaphore sema = new Semaphore(0) ;
-        Thread t = new Thread(()->{
-            sema.release();
-            Txn.execWrite(dsg, ()->dsg.add(q3)) ;   
-            sema.release();
+        Thread t = new Thread(() -> {
+            sema.release() ;
+            Txn.execWrite(dsg, () -> dsg.add(q3)) ;
+            sema.release() ;
         }) ;
-        
-        dsg.begin(ReadWrite.READ);
+
+        dsg.begin(ReadWrite.READ) ;
         // Promote
         dsg.add(q1) ;
-        t.start(); 
+        t.start() ;
         // First release.
-        sema.acquireUninterruptibly();
-        // Thread blocked. 
+        sema.acquireUninterruptibly() ;
+        // Thread blocked.
         dsg.add(q2) ;
-        dsg.commit();
+        dsg.commit() ;
         dsg.end() ;
-        
+
         // Until thread exits.
-        sema.acquireUninterruptibly();
+        sema.acquireUninterruptibly() ;
         assertCount(3, dsg) ;
     }
+
+    @Test public void promote_snapshot_07()         { run_07(false) ; }
+    @Test public void promote_readCommitted_07()    { run_07(true) ; }
     
-    @Test public void promote_05() {
+    // Async writer after promotion.
+    private void run_07(boolean b) {
+        DatasetGraphTransaction.readCommittedPromotion = b ;
         DatasetGraph dsg = create() ;
         // Start long running reader.
-        ThreadTxn tt = ThreadTxn.threadTxnRead(dsg, ()->{
+        ThreadTxn tt = ThreadTxn.threadTxnRead(dsg, () -> {
             long x = Iter.count(dsg.find()) ;
-            if ( x != 0 ) 
+            if ( x != 0 )
                 throw new RuntimeException() ;
         }) ;
-    
+
         // Start R->W here
-        dsg.begin(ReadWrite.READ); 
+        dsg.begin(ReadWrite.READ) ;
         dsg.add(q1) ;
         dsg.add(q2) ;
-        dsg.commit();
+        dsg.commit() ;
         dsg.end() ;
-        tt.run();
+        tt.run() ;
     }
     
-    @Test public void promote_06() {
-        promoteRC(true) ;
-    }
-        
-    @Test(expected=TDBTransactionException.class)
-    public void promote_07() {
-        promoteRC(false) ;
-    }
-     
-    private void promoteRC(boolean allowReadCommitted) {
-        DatasetGraphTransaction.readCommittedPromotion = allowReadCommitted ;    
+    @Test public void promote_snapshot_08()         { run_08(false) ; }
+    @Test public void promote_readCommitted_08()    { run_08(true) ; }
+    
+    // Async writer after promotion trasnaction ends.
+    private void run_08(boolean b) {
+        DatasetGraphTransaction.readCommittedPromotion = b ;
         DatasetGraph dsg = create() ;
+        // Start R->W here
+        dsg.begin(ReadWrite.READ) ;
+        dsg.add(q1) ;
+        dsg.add(q2) ;
+        dsg.commit() ;
+        dsg.end() ;
+        Txn.execRead(dsg, () -> {
+            long x = Iter.count(dsg.find()) ;
+            assertEquals(2, x) ;
+        }) ;
+    }
+
+    // Tests for XXX Read-committed yes/no, and whether the other transaction commits or
aborts.
+    
+    @Test
+    public void promote_10() { promote_readCommit_txnCommit(true, true) ; }
+
+    @Test
+    public void promote_11() { promote_readCommit_txnCommit(true, false) ; }
+    
+    @Test(expected = TDBTransactionException.class)
+    public void promote_12() { promote_readCommit_txnCommit(false, true) ; }
+
+    @Test
+    public void promote_13() { promote_readCommit_txnCommit(false, false) ; }
 
-        ThreadTxn tt = ThreadTxn.threadTxnWrite(dsg, ()->{dsg.add(q3) ;}) ;
+    private void promote_readCommit_txnCommit(boolean allowReadCommitted, boolean asyncCommit)
{
+        logger2.setLevel(Level.ERROR);
+        DatasetGraphTransaction.readCommittedPromotion = allowReadCommitted ;
+        DatasetGraph dsg = create() ;
         
-        dsg.begin(ReadWrite.READ);
+        ThreadTxn tt = asyncCommit?
+            ThreadTxn.threadTxnWrite(dsg, () -> dsg.add(q3) ) :
+            ThreadTxn.threadTxnWriteAbort(dsg, () -> dsg.add(q3)) ;
+
+        dsg.begin(ReadWrite.READ) ;
         // Other runs
-        tt.run(); 
-        // Can  promote if readCommited
+        tt.run() ;
+        // Can promote if readCommited
         // Can't promote if not readCommited
         dsg.add(q1) ;
-        assertTrue(dsg.contains(q3)) ;
-        dsg.commit();
+        if ( ! allowReadCommitted && asyncCommit )
+            fail("Should not be here") ;
+        
+        assertEquals(asyncCommit, dsg.contains(q3)) ;
+        dsg.commit() ;
+        dsg.end() ;
+        logger2.setLevel(level2);
+    }
+    
+    // Test whether a writer casuses a snapshot isolation
+    // promotion to fail like it should
+    @Test(expected=TDBTransactionException.class)
+    public void promote_clash_1() { 
+        DatasetGraphTransaction.readCommittedPromotion = false ;
+        DatasetGraph dsg = create() ;
+        Semaphore sema1 = new Semaphore(0) ;
+        Semaphore sema2 = new Semaphore(0) ;
+        Runnable r = ()->{
+            dsg.begin(ReadWrite.WRITE) ; 
+            sema1.release(1); 
+            sema2.acquireUninterruptibly(1) ;
+            dsg.commit() ; 
+            dsg.end() ; 
+        } ;
+        
+        // Create a writer that waits.
+        new Thread(r).start(); 
+        sema1.acquireUninterruptibly(); 
+        // The thread is in the write.
+        dsg.begin(ReadWrite.READ) ;
+        // If read commited this will block.
+        // If snapshot, this will though an exception due to the active writer.
+        dsg.add(q1) ;
+        fail("Should not be here") ;
+        dsg.commit() ;
         dsg.end() ;
     }
-
 }


Mime
View raw message