jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [4/6] jena git commit: JENA-1223: Transaction promotion for TIM
Date Tue, 27 Sep 2016 20:11:46 GMT
JENA-1223: Transaction promotion for TIM


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/26109087
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/26109087
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/26109087

Branch: refs/heads/master
Commit: 2610908744464e1247b2896f7b26e78cadf56d84
Parents: 51175d2
Author: Andy Seaborne <andy@apache.org>
Authored: Tue Sep 20 19:24:27 2016 +0100
Committer: Andy Seaborne <andy@apache.org>
Committed: Tue Sep 20 19:24:27 2016 +0100

----------------------------------------------------------------------
 .../sparql/core/mem/DatasetGraphInMemory.java   |  60 ++-
 .../jena/sparql/core/mem/TS_DatasetTxnMem.java  |   3 +-
 .../mem/TestDatasetGraphInMemoryPromote.java    |  65 +++
 .../transaction/AbstractTestTransPromote.java   | 422 +++++++++++++++++++
 .../jena/tdb/transaction/TS_TransactionTDB.java |   2 +-
 .../jena/tdb/transaction/TestTransPromote.java  | 391 -----------------
 .../tdb/transaction/TestTransPromoteTDB.java    |  72 ++++
 7 files changed, 617 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/26109087/jena-arq/src/main/java/org/apache/jena/sparql/core/mem/DatasetGraphInMemory.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/mem/DatasetGraphInMemory.java
b/jena-arq/src/main/java/org/apache/jena/sparql/core/mem/DatasetGraphInMemory.java
index 8af0734..d6dfdff 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/core/mem/DatasetGraphInMemory.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/mem/DatasetGraphInMemory.java
@@ -27,10 +27,12 @@ import static org.apache.jena.sparql.util.graph.GraphUtils.triples2quadsDftGraph
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong ;
 import java.util.concurrent.locks.ReentrantLock ;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
+import org.apache.jena.atlas.lib.InternalErrorException ;
 import org.apache.jena.graph.Graph;
 import org.apache.jena.graph.Node;
 import org.apache.jena.graph.Triple;
@@ -42,9 +44,8 @@ import org.apache.jena.sparql.core.* ;
 import org.slf4j.Logger;
 
 /**
- * A {@link DatasetGraph} backed by an {@link QuadTable}. By default, this is a {@link HexTable}
designed for high-speed
- * in-memory operation.
- *
+ * A {@link DatasetGraph} backed by an {@link QuadTable}. By default, this is a
+ * {@link HexTable} designed for high-speed in-memory operation.
  */
 public class DatasetGraphInMemory extends DatasetGraphTriplesQuads implements Transactional
{
 
@@ -63,6 +64,13 @@ public class DatasetGraphInMemory extends DatasetGraphTriplesQuads implements
Tr
      * insures that they are made consistently.
      */
     private final ReentrantLock systemLock = new ReentrantLock(true);
+    
+    /**
+     * Dataset version.
+     * A write transaction increments this in commit.
+     */
+    private final AtomicLong generation = new AtomicLong(0) ;
+    private final ThreadLocal<Long> version = withInitial(() -> 0L);
 
     private final ThreadLocal<Boolean> isInTransaction = withInitial(() -> false);
 
@@ -133,6 +141,7 @@ public class DatasetGraphInMemory extends DatasetGraphTriplesQuads implements
Tr
         withLock(systemLock, () ->{
             quadsIndex().begin(readWrite);
             defaultGraph().begin(readWrite);
+            version.set(generation.get());
         }) ;
     }
     
@@ -147,6 +156,7 @@ public class DatasetGraphInMemory extends DatasetGraphTriplesQuads implements
Tr
     private void finishTransaction() {
         isInTransaction.remove();
         transactionType.remove();
+        version.remove();
         transactionLock.leaveCriticalSection();
     }
      
@@ -165,6 +175,12 @@ public class DatasetGraphInMemory extends DatasetGraphTriplesQuads implements
Tr
             defaultGraph().commit();
             quadsIndex().end();
             defaultGraph().end();
+
+            if ( transactionType().equals(WRITE) ) {
+                if ( version.get() != generation.get() )
+                    throw new InternalErrorException(String.format("Version=%d, Generation=%d",version.get(),generation.get()))
;
+                generation.incrementAndGet() ;
+            }
         } ) ;
     }
     
@@ -306,8 +322,42 @@ public class DatasetGraphInMemory extends DatasetGraphTriplesQuads implements
Tr
             } finally {
                 end();
             }
-        } else if (transactionType().equals(WRITE)) mutator.accept(payload);
-        else throw new JenaTransactionException("Tried to write inside a READ transaction!");
+            return ;
+        }
+        if ( !transactionType().equals(WRITE) ) {
+            if ( ! promotion )
+                throw new JenaTransactionException("Tried to write inside a READ transaction!");
+            promote(readCommittedPromotion) ;
+        }
+        mutator.accept(payload);
+    }
+
+    /*private*/public/*for development*/ static boolean promotion               = false ;
+    /*private*/public/*for development*/ static boolean readCommittedPromotion  = true ;
+    
+    private void promote(boolean readCommited) {
+        //System.err.printf("Promote: version=%d generation=%d\n", version.get() , generation.get())
;
+        
+        // Outside lock.
+        if ( ! readCommited && version.get() != generation.get() )  {
+            // This tests for any commited writers since this transaction started.
+            // This does not catch the case of a currently active writer
+            // that has not gone to commit or abort yet.
+            // The final test is after we obtain the transactionLock.
+            throw new JenaTransactionException("Dataset changed - can't promote") ;
+        }
+       
+        // Blocking on other writers.
+        transactionLock.enterCriticalSection(false);
+        // Check again now we are inside the lock. 
+        if ( ! readCommited && version.get() != generation.get() )  {
+                // Can't promote - release the lock.
+                transactionLock.leaveCriticalSection();
+                throw new JenaTransactionException("Concurrent writer changed the dataset
: can't promote") ;
+            }
+        // We have the lock and we have promoted!
+        transactionType(WRITE);
+        _begin(WRITE) ;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/jena/blob/26109087/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TS_DatasetTxnMem.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TS_DatasetTxnMem.java
b/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TS_DatasetTxnMem.java
index f3d795b..2fa0f60 100644
--- a/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TS_DatasetTxnMem.java
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TS_DatasetTxnMem.java
@@ -40,6 +40,7 @@ import org.junit.runners.Suite.SuiteClasses;
     
     TestDatasetGraphInMemoryFind.class,
     TestDatasetGraphInMemoryFindPattern.class,
-    TestDatasetGraphInMemoryIsolation.class
+    TestDatasetGraphInMemoryIsolation.class,
+    TestDatasetGraphInMemoryPromote.class
  })
 public class TS_DatasetTxnMem {}

http://git-wip-us.apache.org/repos/asf/jena/blob/26109087/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TestDatasetGraphInMemoryPromote.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TestDatasetGraphInMemoryPromote.java
b/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TestDatasetGraphInMemoryPromote.java
new file mode 100644
index 0000000..dbcfde0
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/mem/TestDatasetGraphInMemoryPromote.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.core.mem ;
+
+import org.apache.jena.sparql.JenaTransactionException ;
+import org.apache.jena.sparql.core.DatasetGraph ;
+import org.apache.jena.sparql.transaction.AbstractTestTransPromote ;
+import org.apache.log4j.Logger ;
+
+/** Tests for transactions that start read and then promote to write -- TIM */
+public class TestDatasetGraphInMemoryPromote extends AbstractTestTransPromote {
+    public TestDatasetGraphInMemoryPromote() {
+        super(getLoggers()) ;
+    }
+
+    @Override
+    protected DatasetGraph create() {
+        return new DatasetGraphInMemory() ;
+    }
+
+    private static Logger[] getLoggers() {
+        return new Logger[]{ Logger.getLogger(DatasetGraphInMemory.class) } ;
+    }
+
+    @Override
+    protected void setPromotion(boolean b) {
+        DatasetGraphInMemory.promotion = b ;
+    }
+
+    @Override
+    protected boolean getPromotion() {
+        return DatasetGraphInMemory.promotion ;
+    }
+
+    @Override
+    protected void setReadCommitted(boolean b) {
+        DatasetGraphInMemory.readCommittedPromotion = b ;
+    }
+
+    @Override
+    protected boolean getReadCommitted() {
+        return DatasetGraphInMemory.readCommittedPromotion ;
+    }
+
+    @Override
+    protected Class<?> getTransactionExceptionClass() {
+        return JenaTransactionException.class ;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/jena/blob/26109087/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java
b/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java
new file mode 100644
index 0000000..f7c7fcb
--- /dev/null
+++ b/jena-arq/src/test/java/org/apache/jena/sparql/transaction/AbstractTestTransPromote.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.transaction ;
+
+import static org.junit.Assert.assertEquals ;
+import static org.junit.Assert.fail ;
+
+import java.util.concurrent.* ;
+import java.util.concurrent.atomic.AtomicInteger ;
+
+import org.apache.jena.atlas.iterator.Iter ;
+import org.apache.jena.atlas.lib.Lib ;
+import org.apache.jena.query.ReadWrite ;
+import org.apache.jena.shared.JenaException ;
+import org.apache.jena.sparql.core.DatasetGraph ;
+import org.apache.jena.sparql.core.Quad ;
+import org.apache.jena.sparql.sse.SSE ;
+import org.apache.jena.system.ThreadAction ;
+import org.apache.jena.system.ThreadTxn ;
+import org.apache.jena.system.Txn ;
+import org.apache.log4j.Level ;
+import org.apache.log4j.Logger ;
+import org.junit.After ;
+import org.junit.Before ;
+import org.junit.Test ;
+
+/** Tests for transactions that start read and then promote to write */
+public abstract class AbstractTestTransPromote {
+
+    // Currently,
+    // this feature is off and needs enabling via setPromotion.
+    // promotion is implicit when a write happens.
+
+    // See beforeClass / afterClass.
+
+    // Loggers.
+    private final Logger[] loggers ;
+    private Level[] levels ;
+    private boolean stdPromotion ;
+    private boolean stdReadCommitted ;
+    
+    @Before
+    public void beforeLoggersNoWarnings() {
+        int N = loggers.length ;
+        levels = new Level[N] ;
+        for ( int i = 0 ; i < N ; i++ ) {
+            levels[i] = loggers[i].getLevel() ;
+            loggers[i].setLevel(Level.ERROR) ;
+        }
+    }
+
+    @After
+    public void afterResetLoggers() {
+        int N = loggers.length ;
+        for ( int i = 0 ; i < N ; i++ ) {
+            loggers[i].setLevel(levels[i]) ;
+        }
+    }
+
+    protected abstract void setPromotion(boolean b) ;
+    protected abstract boolean getPromotion() ;
+    protected abstract void setReadCommitted(boolean b) ;
+    protected abstract boolean getReadCommitted() ;
+    
+    // The exact class used by exceptions of the system under test.
+    // TDB transctions are in the TDBException hierarchy
+    // so can't be JenaTransactionException.
+    protected abstract Class<?> getTransactionExceptionClass() ;
+    
+    @Before
+    public void before() {
+        stdPromotion = getPromotion() ;
+        stdReadCommitted = getReadCommitted() ;
+        setPromotion(true);
+        setReadCommitted(true);
+    }
+
+    @After
+    public void after() {
+        setPromotion(stdPromotion);
+        setReadCommitted(stdReadCommitted);
+    }
+    
+    protected AbstractTestTransPromote(Logger[] loggers) {
+        this.loggers = loggers ;
+    }
+    
+    
+    private static Quad q1 = SSE.parseQuad("(_ :s :p1 1)") ;
+    private static Quad q2 = SSE.parseQuad("(_ :s :p2 2)") ;
+    private static Quad q3 = SSE.parseQuad("(_ :s :p3 3)") ;
+
+    protected abstract DatasetGraph create() ;
+
+    protected static void assertCount(long expected, DatasetGraph dsg) {
+        dsg.begin(ReadWrite.READ) ;
+        long x = Iter.count(dsg.find()) ;
+        dsg.end() ;
+        assertEquals(expected, x) ;
+    }
+
+    // "strict" = don't see intermedioate changes.
+    // "readCommitted" = do see
+
+    // Subclass / parameterized
+    
+    @Test public void promote_snapshot_01()         { run_01(false) ; }
+    @Test public void promote_readCommitted_01()    { run_01(true) ; }
+    
+    // READ-add
+    private void run_01(boolean allowReadCommitted) {
+        setReadCommitted(allowReadCommitted);
+        DatasetGraph dsg = create() ;
+        
+        dsg.begin(ReadWrite.READ) ;
+        dsg.add(q1) ;
+        dsg.commit() ;
+        dsg.end() ;
+    }
+    
+    @Test public void promote_snapshot_02()         { run_02(false) ; }
+    @Test public void promote_readCommitted_02()    { run_02(true) ; }
+    
+    // Previous transaction then READ-add
+    private void run_02(boolean allowReadCommitted) {
+        setReadCommitted(allowReadCommitted);
+        DatasetGraph dsg = create() ;
+        
+        dsg.begin(ReadWrite.READ) ;dsg.end() ;
+        
+        dsg.begin(ReadWrite.READ) ;
+        dsg.add(q1) ;
+        dsg.commit() ;
+        dsg.end() ;
+    }
+    
+    @Test public void promote_snapshot_03()         { run_03(false) ; }
+    @Test public void promote_readCommitted_03()    { run_03(true) ; }
+
+    private void run_03(boolean allowReadCommitted) {
+        setReadCommitted(allowReadCommitted);
+        DatasetGraph dsg = create() ;
+        
+        dsg.begin(ReadWrite.WRITE) ;dsg.commit() ; dsg.end() ;
+        
+        dsg.begin(ReadWrite.READ) ;
+        dsg.add(q1) ;
+        dsg.commit() ;
+        dsg.end() ;
+    }
+    
+    @Test public void promote_snapshot_04()         { run_04(false) ; }
+    @Test public void promote_readCommitted_04()    { run_04(true) ; }
+
+    private void run_04(boolean allowReadCommitted) {
+        setReadCommitted(allowReadCommitted);
+        DatasetGraph dsg = create() ;
+        
+        dsg.begin(ReadWrite.WRITE) ;dsg.abort() ; dsg.end() ;
+        
+        dsg.begin(ReadWrite.READ) ;
+        dsg.add(q1) ;
+        dsg.commit() ;
+        dsg.end() ;
+    }
+
+    @Test public void promote_snapshot_05()         { run_05(false) ; }
+    @Test public void promote_readCommitted_05()    { run_05(true) ; }
+    
+    private void run_05(boolean allowReadCommitted) {
+        setReadCommitted(allowReadCommitted);
+        DatasetGraph dsg = create() ;
+        dsg.begin(ReadWrite.READ) ;
+        dsg.add(q1) ;
+
+        // bad - forced abort.
+        // Causes a WARN.
+        //logger1.setLevel(Level.ERROR) ;
+        dsg.end() ;
+        //logger1.setLevel(level1) ;
+
+        assertCount(0, dsg) ;
+    }
+
+    @Test public void promote_snapshot_06()         { run_06(false) ; }
+    @Test public void promote_readCommitted_06()    { run_06(true) ; }
+    
+    // Async writer after promotion.
+    private void run_06(boolean allowReadCommitted) {
+        setReadCommitted(allowReadCommitted);
+        DatasetGraph dsg = create() ;
+        AtomicInteger a = new AtomicInteger(0) ;
+
+        Semaphore sema = new Semaphore(0) ;
+        Thread t = new Thread(() -> {
+            sema.release() ;
+            Txn.executeWrite(dsg, () -> dsg.add(q3)) ;
+            sema.release() ;
+        }) ;
+
+        dsg.begin(ReadWrite.READ) ;
+        // Promote
+        dsg.add(q1) ;
+        t.start() ;
+        // First release.
+        sema.acquireUninterruptibly() ;
+        // Thread blocked.
+        dsg.add(q2) ;
+        dsg.commit() ;
+        dsg.end() ;
+
+        // Until thread exits.
+        sema.acquireUninterruptibly() ;
+        assertCount(3, dsg) ;
+    }
+
+    @Test public void promote_snapshot_07()         { run_07(false) ; }
+    @Test public void promote_readCommitted_07()    { run_07(true) ; }
+    
+    // Async writer after promotion.
+    private void run_07(boolean allowReadCommitted) {
+        setReadCommitted(allowReadCommitted);
+        DatasetGraph dsg = create() ;
+        // Start long running reader.
+        ThreadAction tt = ThreadTxn.threadTxnRead(dsg, () -> {
+            long x = Iter.count(dsg.find()) ;
+            if ( x != 0 )
+                throw new RuntimeException() ;
+        }) ;
+
+        // Start R->W here
+        dsg.begin(ReadWrite.READ) ;
+        dsg.add(q1) ;
+        dsg.add(q2) ;
+        dsg.commit() ;
+        dsg.end() ;
+        tt.run() ;
+    }
+    
+    @Test public void promote_snapshot_08()         { run_08(false) ; }
+    @Test public void promote_readCommitted_08()    { run_08(true) ; }
+    
+    // Async writer after promotion trasnaction ends.
+    private void run_08(boolean allowReadCommitted) {
+        setReadCommitted(allowReadCommitted);
+        DatasetGraph dsg = create() ;
+        // Start R->W here
+        dsg.begin(ReadWrite.READ) ;
+        dsg.add(q1) ;
+        dsg.add(q2) ;
+        dsg.commit() ;
+        dsg.end() ;
+        Txn.executeRead(dsg, () -> {
+            long x = Iter.count(dsg.find()) ;
+            assertEquals(2, x) ;
+        }) ;
+    }
+
+    // Tests for XXX Read-committed yes/no (false = snapshot isolation, true = read committed),
+    // and whether the other transaction commits (true) or aborts (false).
+    
+    @Test
+    public void promote_10() { promote_readCommit_txnCommit(true, true) ; }
+
+    @Test
+    public void promote_11() { promote_readCommit_txnCommit(true, false) ; }
+    
+    @Test
+    public void promote_12() { 
+        expect(()->promote_readCommit_txnCommit(false, true) ,
+               getTransactionExceptionClass()) ;
+    }
+    
+    private void expect(Runnable runnable, Class<?>...classes) {
+        try {
+            runnable.run(); 
+            fail("Exception expected") ;
+        } catch (Exception e) {
+            for ( Class<?> c : classes) {
+                if ( e.getClass().equals(c) )
+                    return ;
+            }
+            throw e ;
+        }
+    }
+
+    @Test
+    public void promote_13() { promote_readCommit_txnCommit(false, false) ; }
+
+    private void promote_readCommit_txnCommit(boolean allowReadCommitted, boolean asyncCommit)
{
+        setReadCommitted(allowReadCommitted) ;
+        DatasetGraph dsg = create() ;
+        
+        ThreadAction tt = asyncCommit?
+            ThreadTxn.threadTxnWrite(dsg, () -> dsg.add(q3) ) :
+            ThreadTxn.threadTxnWriteAbort(dsg, () -> dsg.add(q3)) ;
+
+        dsg.begin(ReadWrite.READ) ;
+        // Other runs
+        tt.run() ;
+        // Can promote if readCommited
+        // Can't promote if not readCommited
+        dsg.add(q1) ;
+        if ( ! allowReadCommitted && asyncCommit )
+            fail("Should not be here") ;
+        
+        assertEquals(asyncCommit, dsg.contains(q3)) ;
+        dsg.commit() ;
+        dsg.end() ;
+        //logger2.setLevel(level2);
+    }
+    
+    // Active writer commits -> no promotion.
+    @Test
+    public void promote_active_writer_1() throws InterruptedException, ExecutionException
{
+        expect(()->promote_active_writer(true) ,
+               getTransactionExceptionClass()) ;
+    }
+    
+    // Active writer aborts -> promotion.
+    @Test
+    public void promote_active_writer_2() throws InterruptedException, ExecutionException
{
+        // Active writer aborts -> promotion possible (but not implemented that way).
+        promote_active_writer(false) ;
+    }
+    
+    private void promote_active_writer(boolean activeWriterCommit) {
+        ExecutorService executor = Executors.newFixedThreadPool(2) ;
+        try {
+            promote_clash_active_writer(executor, activeWriterCommit) ;
+        }
+        finally {
+            executor.shutdown() ;
+        }
+    }
+    
+    private void promote_clash_active_writer(ExecutorService executor, boolean activeWriterCommit)
{
+        setReadCommitted(false) ;
+        Semaphore semaActiveWriterStart     = new Semaphore(0) ;
+        Semaphore semaActiveWriterContinue  = new Semaphore(0) ;
+        Semaphore semaPromoteTxnStart       = new Semaphore(0) ;
+        Semaphore semaPromoteTxnContinue    = new Semaphore(0) ;
+
+        DatasetGraph dsg = create() ;
+
+        // The "active writer". 
+        Callable<Object> activeWriter = ()->{
+            dsg.begin(ReadWrite.WRITE) ;
+            semaActiveWriterStart.release(1) ;
+            // (*1)
+            semaActiveWriterContinue.acquireUninterruptibly(1) ;
+            if ( activeWriterCommit )
+                dsg.commit() ;
+            else
+                dsg.abort();
+            dsg.end() ;
+            return null ;
+        } ;
+
+        Future<Object> activeWriterFuture = executor.submit(activeWriter) ;
+        // Advance "active writer" to (*1), inside a write transaction and waiting.
+        // The transaction has been created and started.
+        semaActiveWriterStart.acquireUninterruptibly(); 
+
+        Callable<JenaException> attemptedPromote = ()->{
+            dsg.begin(ReadWrite.READ) ;
+            semaPromoteTxnStart.release(1) ;
+            // (*2)
+            semaPromoteTxnContinue.acquireUninterruptibly();
+            try { 
+                // (*3)
+                dsg.add(q1) ;
+                return null ;
+            } catch (JenaException e) {
+                Class<?> c = getTransactionExceptionClass() ;
+                if ( ! e.getClass().equals(c) ) 
+                    throw e ;
+                return e ;
+            }
+        } ;
+
+        Future<JenaException> attemptedPromoteFuture = executor.submit(attemptedPromote)
;
+        // Advance "attempted promote" to (*2), inside a read transaction, before attempting
a promoting write.
+        // The transaction has been created and started.
+        semaPromoteTxnStart.acquireUninterruptibly();
+        
+        // Advance "attempted promote" allowing it to go (*3) where it blocks
+        // This may happen at any time - as soon as it does, the "attempted promote" blocks.
+        semaPromoteTxnContinue.release(1);
+        // I don't know of a better way to ensure "attempted promote" is blocked. 
+        
+        Lib.sleep(100) ;
+        // Let the active writer go.
+        semaActiveWriterContinue.release(1);
+        
+        try { 
+            // Collect the active writer.
+            activeWriterFuture.get();
+            
+            // (Ideal) and the attempted promotion should advance if the active writer aborts.
+            JenaException e = attemptedPromoteFuture.get() ;
+            if ( e != null )
+                throw e ;
+        } catch (InterruptedException | ExecutionException e1) { throw new RuntimeException(e1)
; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/26109087/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TS_TransactionTDB.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TS_TransactionTDB.java
b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TS_TransactionTDB.java
index e1f8af4..87bbf1e 100644
--- a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TS_TransactionTDB.java
+++ b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TS_TransactionTDB.java
@@ -41,7 +41,7 @@ import org.junit.runners.Suite ;
     , TestTransactionUnionGraph.class
     , TestMiscTDB.class
     , TestTDBInternal.class
-    , TestTransPromote.class
+    , TestTransPromoteTDB.class
     , TestTransControl.class
     , TestTransIsolation.class
 })

http://git-wip-us.apache.org/repos/asf/jena/blob/26109087/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
deleted file mode 100644
index 5e62924..0000000
--- a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromote.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.tdb.transaction ;
-
-import static org.junit.Assert.assertEquals ;
-import static org.junit.Assert.fail ;
-
-import java.util.concurrent.* ;
-import java.util.concurrent.atomic.AtomicInteger ;
-
-import org.apache.jena.atlas.iterator.Iter ;
-import org.apache.jena.atlas.lib.Lib ;
-import org.apache.jena.query.ReadWrite ;
-import org.apache.jena.sparql.core.DatasetGraph ;
-import org.apache.jena.sparql.core.Quad ;
-import org.apache.jena.sparql.sse.SSE ;
-import org.apache.jena.system.ThreadAction ;
-import org.apache.jena.system.ThreadTxn ;
-import org.apache.jena.system.Txn ;
-import org.apache.jena.tdb.TDB ;
-import org.apache.jena.tdb.TDBFactory ;
-import org.apache.jena.tdb.sys.SystemTDB ;
-import org.apache.log4j.Level ;
-import org.apache.log4j.Logger ;
-import org.junit.* ;
-
-/** Tests for transactions that start read and then promote to write */
-public class TestTransPromote {
-
-    // Currently,
-    // this feature is off and needs enabling via DatasetGraphTransaction.promotion
-    // promotion is implicit when a write happens.
-
-    // See beforeClass / afterClass.
-
-    private static Logger logger1 = Logger.getLogger(SystemTDB.errlog.getName()) ;
-    private static Level  level1 ;
-    private static Logger logger2 = Logger.getLogger(TDB.logInfoName) ;
-    private static Level  level2 ;
-    static boolean        stdPromotion ;
-    static boolean        stdReadCommitted ;
-
-    @BeforeClass
-    static public void beforeClass() {
-        stdPromotion = DatasetGraphTransaction.promotion ;
-        stdReadCommitted = DatasetGraphTransaction.readCommittedPromotion ;
-        level1 = logger1.getLevel() ;
-        level2 = logger2.getLevel() ;
-    }
-
-    @AfterClass
-    static public void afterClass() {
-        // Restore logging setting.
-        logger2.setLevel(level2) ;
-        logger1.setLevel(level1) ;
-        DatasetGraphTransaction.promotion = stdPromotion ;
-        DatasetGraphTransaction.readCommittedPromotion = stdReadCommitted ;
-    }
-
-    @Before
-    public void before() {
-        DatasetGraphTransaction.promotion = true ;
-        DatasetGraphTransaction.readCommittedPromotion = true ;
-    }
-
-    @After
-    public void after() {
-        DatasetGraphTransaction.promotion = true ;
-        DatasetGraphTransaction.readCommittedPromotion = true ;
-    }
-    
-    
-    private static Quad q1 = SSE.parseQuad("(_ :s :p1 1)") ;
-    private static Quad q2 = SSE.parseQuad("(_ :s :p2 2)") ;
-    private static Quad q3 = SSE.parseQuad("(_ :s :p3 3)") ;
-
-    protected DatasetGraph create() {
-        return TDBFactory.createDatasetGraph() ;
-    }
-
-    protected static void assertCount(long expected, DatasetGraph dsg) {
-        dsg.begin(ReadWrite.READ) ;
-        long x = Iter.count(dsg.find()) ;
-        dsg.end() ;
-        assertEquals(expected, x) ;
-    }
-
-    // "strict" = don't see intermedioate changes.
-    // "readCommitted" = do see
-
-    // Subclass / parameterized
-    
-    @Test public void promote_snapshot_01()         { run_01(false) ; }
-    @Test public void promote_readCommitted_01()    { run_01(true) ; }
-    
-    // READ-add
-    private void run_01(boolean b) {
-        DatasetGraphTransaction.readCommittedPromotion = b ;
-        DatasetGraph dsg = create() ;
-        
-        dsg.begin(ReadWrite.READ) ;
-        dsg.add(q1) ;
-        dsg.commit() ;
-        dsg.end() ;
-    }
-    
-    @Test public void promote_snapshot_02()         { run_02(false) ; }
-    @Test public void promote_readCommitted_02()    { run_02(true) ; }
-    
-    // Previous transaction then READ-add
-    private void run_02(boolean b) {
-        DatasetGraphTransaction.readCommittedPromotion = b ;
-        DatasetGraph dsg = create() ;
-        
-        dsg.begin(ReadWrite.READ) ;dsg.end() ;
-        
-        dsg.begin(ReadWrite.READ) ;
-        dsg.add(q1) ;
-        dsg.commit() ;
-        dsg.end() ;
-    }
-    
-    @Test public void promote_snapshot_03()         { run_03(false) ; }
-    @Test public void promote_readCommitted_03()    { run_03(true) ; }
-
-    private void run_03(boolean b) {
-        DatasetGraphTransaction.readCommittedPromotion = b ;
-        DatasetGraph dsg = create() ;
-        
-        dsg.begin(ReadWrite.WRITE) ;dsg.commit() ; dsg.end() ;
-        
-        dsg.begin(ReadWrite.READ) ;
-        dsg.add(q1) ;
-        dsg.commit() ;
-        dsg.end() ;
-    }
-    
-    @Test public void promote_snapshot_04()         { run_04(false) ; }
-    @Test public void promote_readCommitted_04()    { run_04(true) ; }
-
-    private void run_04(boolean b) {
-        DatasetGraphTransaction.readCommittedPromotion = b ;
-        DatasetGraph dsg = create() ;
-        
-        dsg.begin(ReadWrite.WRITE) ;dsg.abort() ; dsg.end() ;
-        
-        dsg.begin(ReadWrite.READ) ;
-        dsg.add(q1) ;
-        dsg.commit() ;
-        dsg.end() ;
-    }
-
-    @Test public void promote_snapshot_05()         { run_05(false) ; }
-    @Test public void promote_readCommitted_05()    { run_05(true) ; }
-    
-    private void run_05(boolean b) {
-        DatasetGraphTransaction.readCommittedPromotion = b ;
-        DatasetGraph dsg = create() ;
-        dsg.begin(ReadWrite.READ) ;
-        dsg.add(q1) ;
-
-        // bad - forced abort.
-        // Causes a WARN.
-        logger1.setLevel(Level.ERROR) ;
-        dsg.end() ;
-        logger1.setLevel(level1) ;
-
-        assertCount(0, dsg) ;
-    }
-
-    @Test public void promote_snapshot_06()         { run_06(false) ; }
-    @Test public void promote_readCommitted_06()    { run_06(true) ; }
-    
-    // Async writer after promotion.
-    private void run_06(boolean b) {
-        DatasetGraphTransaction.readCommittedPromotion = b ;
-        DatasetGraph dsg = create() ;
-        AtomicInteger a = new AtomicInteger(0) ;
-
-        Semaphore sema = new Semaphore(0) ;
-        Thread t = new Thread(() -> {
-            sema.release() ;
-            Txn.executeWrite(dsg, () -> dsg.add(q3)) ;
-            sema.release() ;
-        }) ;
-
-        dsg.begin(ReadWrite.READ) ;
-        // Promote
-        dsg.add(q1) ;
-        t.start() ;
-        // First release.
-        sema.acquireUninterruptibly() ;
-        // Thread blocked.
-        dsg.add(q2) ;
-        dsg.commit() ;
-        dsg.end() ;
-
-        // Until thread exits.
-        sema.acquireUninterruptibly() ;
-        assertCount(3, dsg) ;
-    }
-
-    @Test public void promote_snapshot_07()         { run_07(false) ; }
-    @Test public void promote_readCommitted_07()    { run_07(true) ; }
-    
-    // Async writer after promotion.
-    private void run_07(boolean b) {
-        DatasetGraphTransaction.readCommittedPromotion = b ;
-        DatasetGraph dsg = create() ;
-        // Start long running reader.
-        ThreadAction tt = ThreadTxn.threadTxnRead(dsg, () -> {
-            long x = Iter.count(dsg.find()) ;
-            if ( x != 0 )
-                throw new RuntimeException() ;
-        }) ;
-
-        // Start R->W here
-        dsg.begin(ReadWrite.READ) ;
-        dsg.add(q1) ;
-        dsg.add(q2) ;
-        dsg.commit() ;
-        dsg.end() ;
-        tt.run() ;
-    }
-    
-    @Test public void promote_snapshot_08()         { run_08(false) ; }
-    @Test public void promote_readCommitted_08()    { run_08(true) ; }
-    
-    // Async writer after promotion trasnaction ends.
-    private void run_08(boolean b) {
-        DatasetGraphTransaction.readCommittedPromotion = b ;
-        DatasetGraph dsg = create() ;
-        // Start R->W here
-        dsg.begin(ReadWrite.READ) ;
-        dsg.add(q1) ;
-        dsg.add(q2) ;
-        dsg.commit() ;
-        dsg.end() ;
-        Txn.executeRead(dsg, () -> {
-            long x = Iter.count(dsg.find()) ;
-            assertEquals(2, x) ;
-        }) ;
-    }
-
-    // Tests for XXX Read-committed yes/no (false = snapshot isolation, true = read committed),
-    // and whether the other transaction commits (true) or aborts (false).
-    
-    @Test
-    public void promote_10() { promote_readCommit_txnCommit(true, true) ; }
-
-    @Test
-    public void promote_11() { promote_readCommit_txnCommit(true, false) ; }
-    
-    @Test(expected = TDBTransactionException.class)
-    public void promote_12() { promote_readCommit_txnCommit(false, true) ; }
-
-    @Test
-    public void promote_13() { promote_readCommit_txnCommit(false, false) ; }
-
-    private void promote_readCommit_txnCommit(boolean allowReadCommitted, boolean asyncCommit)
{
-        logger2.setLevel(Level.ERROR);
-        DatasetGraphTransaction.readCommittedPromotion = allowReadCommitted ;
-        DatasetGraph dsg = create() ;
-        
-        ThreadAction tt = asyncCommit?
-            ThreadTxn.threadTxnWrite(dsg, () -> dsg.add(q3) ) :
-            ThreadTxn.threadTxnWriteAbort(dsg, () -> dsg.add(q3)) ;
-
-        dsg.begin(ReadWrite.READ) ;
-        // Other runs
-        tt.run() ;
-        // Can promote if readCommited
-        // Can't promote if not readCommited
-        dsg.add(q1) ;
-        if ( ! allowReadCommitted && asyncCommit )
-            fail("Should not be here") ;
-        
-        assertEquals(asyncCommit, dsg.contains(q3)) ;
-        dsg.commit() ;
-        dsg.end() ;
-        logger2.setLevel(level2);
-    }
-    
-    // Active writer commits -> no promotion.
-    @Test(expected=TDBTransactionException.class)
-    public void promote_active_writer_1() throws InterruptedException, ExecutionException
{
-        promote_active_writer(true) ;
-    }
-    
-    // Active writer aborts -> promotion.
-    @Test
-    public void promote_active_writer_2() throws InterruptedException, ExecutionException
{
-        // Active writer aborts -> promotion possible (but not implemented that way).
-        promote_active_writer(false) ;
-    }
-    
-    private void promote_active_writer(boolean activeWriterCommit) {
-        ExecutorService executor = Executors.newFixedThreadPool(2) ;
-        try {
-            promote_clash_active_writer(executor, activeWriterCommit) ;
-        }
-        finally {
-            executor.shutdown() ;
-        }
-    }
-    
-    private void promote_clash_active_writer(ExecutorService executor, boolean activeWriterCommit)
{
-        DatasetGraphTransaction.readCommittedPromotion = false ;
-        Semaphore semaActiveWriterStart     = new Semaphore(0) ;
-        Semaphore semaActiveWriterContinue  = new Semaphore(0) ;
-        Semaphore semaPromoteTxnStart       = new Semaphore(0) ;
-        Semaphore semaPromoteTxnContinue    = new Semaphore(0) ;
-
-        DatasetGraph dsg = create() ;
-
-        // The "active writer". 
-        Callable<Object> activeWriter = ()->{
-            dsg.begin(ReadWrite.WRITE) ;
-            semaActiveWriterStart.release(1) ;
-            // (*1)
-            semaActiveWriterContinue.acquireUninterruptibly(1) ;
-            if ( activeWriterCommit )
-                dsg.commit() ;
-            else
-                dsg.abort();
-            dsg.end() ;
-            return null ;
-        } ;
-
-        Future<Object> activeWriterFuture = executor.submit(activeWriter) ;
-        // Advance "active writer" to (*1), inside a write transaction and waiting.
-        // The transaction has been created and started.
-        semaActiveWriterStart.acquireUninterruptibly(); 
-
-        Callable<TDBTransactionException> attemptedPromote = ()->{
-            dsg.begin(ReadWrite.READ) ;
-            semaPromoteTxnStart.release(1) ;
-            // (*2)
-            semaPromoteTxnContinue.acquireUninterruptibly();
-            try { 
-                // (*3)
-                dsg.add(q1) ;
-                //System.err.println("PROMOTED");
-                return null ;
-            } catch (TDBTransactionException e) {
-                //System.err.println("NOT PROMOTED");
-                return e ;
-            }
-        } ;
-
-        Future<TDBTransactionException> attemptedPromoteFuture = executor.submit(attemptedPromote)
;
-        // Advance "attempted promote" to (*2), inside a read transaction, before attempting
a promoting write.
-        // The transaction has been created and started.
-        semaPromoteTxnStart.acquireUninterruptibly();
-        
-        // Advance "attempted promote" allowing it to go (*3) where it blocks
-        // This may happen at any time - as soon as it does, the "attempted promote" blocks.
-        semaPromoteTxnContinue.release(1);
-        // I don't know of a better way to ensure "attempted promote" is blocked. 
-        
-        Lib.sleep(100) ;
-        // Let the active writer go.
-        semaActiveWriterContinue.release(1);
-        
-        try { 
-            // Collect the active writer.
-            activeWriterFuture.get();
-            
-            // (Ideal) and the attempted promotion should advance if the active writer aborts.
-            TDBTransactionException e = attemptedPromoteFuture.get() ;
-            if ( e != null )
-                throw e ;
-        } catch (InterruptedException | ExecutionException e1) { throw new RuntimeException(e1)
; }
-    }
-}

http://git-wip-us.apache.org/repos/asf/jena/blob/26109087/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromoteTDB.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromoteTDB.java
b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromoteTDB.java
new file mode 100644
index 0000000..61ba5f1
--- /dev/null
+++ b/jena-tdb/src/test/java/org/apache/jena/tdb/transaction/TestTransPromoteTDB.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb.transaction ;
+
+import org.apache.jena.sparql.core.DatasetGraph ;
+import org.apache.jena.sparql.transaction.AbstractTestTransPromote ;
+import org.apache.jena.tdb.TDB ;
+import org.apache.jena.tdb.TDBFactory ;
+import org.apache.jena.tdb.sys.SystemTDB ;
+import org.apache.jena.tdb.transaction.DatasetGraphTransaction ;
+import org.apache.jena.tdb.transaction.TDBTransactionException ;
+import org.apache.log4j.Logger ;
+
+/** Tests for transactions that start read and then promote to write -- TDB */
+public class TestTransPromoteTDB extends AbstractTestTransPromote {
+    public TestTransPromoteTDB() {
+        super(getLoggers()) ;
+    }
+
+    @Override
+    protected DatasetGraph create() {
+        return TDBFactory.createDatasetGraph() ;
+    }
+
+    private static Logger[] getLoggers() {
+        return new Logger[]{
+            Logger.getLogger(SystemTDB.errlog.getName()),
+            Logger.getLogger(TDB.logInfoName)
+        } ;
+    }
+
+    @Override
+    protected void setPromotion(boolean b) {
+        DatasetGraphTransaction.promotion = b ;
+    }
+
+    @Override
+    protected boolean getPromotion() {
+        return DatasetGraphTransaction.promotion ;
+    }
+
+    @Override
+    protected void setReadCommitted(boolean b) {
+        DatasetGraphTransaction.readCommittedPromotion = b ;
+    }
+
+    @Override
+    protected boolean getReadCommitted() {
+        return DatasetGraphTransaction.readCommittedPromotion ;
+    }
+
+    @Override
+    protected Class<?> getTransactionExceptionClass() {
+        return TDBTransactionException.class ;
+    }
+}
\ No newline at end of file


Mime
View raw message