jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [026/100] [abbrv] [partial] jena git commit: JENA-1397: Rename modules and set versions
Date Thu, 28 Sep 2017 16:05:38 GMT
http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinator.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinator.java
new file mode 100644
index 0000000..8a866a9
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinator.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+import static org.apache.jena.query.ReadWrite.WRITE ;
+import static org.seaborne.dboe.transaction.txn.journal.JournalEntryType.UNDO ;
+
+import java.nio.ByteBuffer ;
+import java.util.ArrayList ;
+import java.util.Iterator ;
+import java.util.List ;
+import java.util.Objects ;
+import java.util.concurrent.ConcurrentHashMap ;
+import java.util.concurrent.Semaphore ;
+import java.util.concurrent.atomic.AtomicLong ;
+import java.util.concurrent.locks.ReadWriteLock ;
+import java.util.concurrent.locks.ReentrantReadWriteLock ;
+
+import org.apache.jena.atlas.logging.Log ;
+import org.apache.jena.query.ReadWrite ;
+import org.seaborne.dboe.base.file.Location ;
+import org.seaborne.dboe.sys.Sys ;
+import org.seaborne.dboe.transaction.txn.journal.Journal ;
+import org.seaborne.dboe.transaction.txn.journal.JournalEntry ;
+import org.slf4j.Logger ;
+
+/**
+ * One {@code TransactionCoordinator} per group of {@link TransactionalComponent}s.
+ * {@link TransactionalComponent}s can not be shared across TransactionCoordinators.
+ * <p>
+ * This is a general engine although tested and most used for multiple-reader
+ * and single-writer (MR+SW). {@link TransactionalComponentLifecycle} provides the
+ * per-threadstyle.
+ * <p>
+ * Contrast to MRSW: multiple-reader or single-writer.
+ * <h3>Block writers</h3>
+ * Block until no writers are active.
+ * When this returns, this guarantees that the database is not changing
+ * and the journal is flushed to disk.
+ * <p>
+ * See {@link #blockWriters()}, {@link #enableWriters()}, {@link #execAsWriter(Runnable)}
+ * <h3>Excluisve mode</h3>
+ * Exclusive mode is when the current thread is the only active code : no readers, no writers.
+ * <p>
+ * See {@link #startExclusiveMode()}/{@link #tryExclusiveMode()} {@link #finishExclusiveMode()}, {@link #execExclusive(Runnable)}
+ *
+ * @see Transaction
+ * @see TransactionalComponent
+ * @see TransactionalSystem
+ */
+final
+public class TransactionCoordinator {
+    private static Logger log = Sys.syslog ;
+    
+    private final Journal journal ;
+    private boolean coordinatorStarted = false ;
+
+    private final ComponentGroup components = new ComponentGroup() ;
+    // Components 
+    private ComponentGroup txnComponents = null ;
+    private List<ShutdownHook> shutdownHooks ;
+    private TxnIdGenerator txnIdGenerator = TxnIdFactory.txnIdGenSimple ;
+    
+    private QuorumGenerator quorumGenerator = null ;
+    //private QuorumGenerator quorumGenerator = (m) -> components ;
+
+    // Semaphore to implement "Single Active Writer" - independent of readers
+    // This is not reentrant.
+    private Semaphore writersWaiting = new Semaphore(1, true) ;
+    
+    // All transaction need a "read" lock through out their lifetime. 
+    // Do not confuse with read/write transactions.  We need a 
+    // "one exclusive, or many other" lock which happens to be called ReadWriteLock
+    // See also {@code lock} which protects the datastructures during transaction management.  
+    private ReadWriteLock exclusivitylock = new ReentrantReadWriteLock() ;
+
+    // Coordinator wide lock object.
+    private Object coordinatorLock = new Object() ;
+
+    @FunctionalInterface
+    public interface ShutdownHook { void shutdown() ; }
+
+    /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */ 
+    public TransactionCoordinator(Location location) {
+        this(Journal.create(location)) ;
+    }
+    
+    /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */ 
+    public TransactionCoordinator(Journal journal) {
+        this(journal, null , new ArrayList<>()) ;
+    }
+
+    /** Create a TransactionCoordinator, initially with {@link TransactionalComponent} in the ComponentGroup */
+    public TransactionCoordinator(Journal journal, List<TransactionalComponent> components) {
+        this(journal, components , new ArrayList<>()) ;
+    }
+
+    //    /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */ 
+//    public TransactionCoordinator(Location journalLocation) {
+//        this(Journal.create(journalLocation), new ArrayList<>() , new ArrayList<>()) ;
+//    }
+
+    private TransactionCoordinator(Journal journal, List<TransactionalComponent> txnComp, List<ShutdownHook> shutdownHooks) { 
+        this.journal = journal ;
+        this.shutdownHooks = new ArrayList<>(shutdownHooks) ;
+        if ( txnComp != null ) {
+            //txnComp.forEach(x-> System.out.println(x.getComponentId().label()+" :: "+Bytes.asHex(x.getComponentId().bytes()) ) ) ;
+            txnComp.forEach(components::add);
+        }
+    }
+    
+    /** Add a {@link TransactionalComponent}.
+     * Safe to call at any time but it is good practice is to add all the
+     * compoents before any transactions start.
+     * Internally, the coordinator ensures the add will safely happen but it
+     * does not add the component to existing transactions.
+     * This must be setup before recovery is attempted. 
+     */
+    public TransactionCoordinator add(TransactionalComponent elt) {
+        checkSetup() ;
+        synchronized(coordinatorLock) {
+            components.add(elt) ;
+        }
+        return this ;
+    }
+
+    /** 
+     * Remove a {@link TransactionalComponent}.
+     * @see #add 
+     */
+    public TransactionCoordinator remove(TransactionalComponent elt) {
+        checkSetup() ;
+        synchronized(coordinatorLock) {
+            components.remove(elt.getComponentId()) ;
+        }
+        return this ;
+    }
+
+    /**
+     * Add a shutdown hook. Shutdown is not guaranteed to be called
+     * and hence hooks may not get called.
+     */
+    public void add(TransactionCoordinator.ShutdownHook hook) {
+        checkSetup() ;
+        synchronized(coordinatorLock) {
+            shutdownHooks.add(hook) ;
+        }
+    }
+
+    /** Remove a shutdown hook */
+    public void remove(TransactionCoordinator.ShutdownHook hook) {
+        checkSetup() ;
+        synchronized(coordinatorLock) {
+            shutdownHooks.remove(hook) ;
+        }
+    }
+    
+    public void setQuorumGenerator(QuorumGenerator qGen) {
+        checkSetup() ;
+        this.quorumGenerator = qGen ;
+    }
+
+    public void start() {
+        checkSetup() ;
+        recovery() ;
+        coordinatorStarted = true ;
+    }
+
+    private /*public*/ void recovery() {
+        
+        Iterator<JournalEntry> iter = journal.entries() ;
+        if ( ! iter.hasNext() ) {
+            components.forEachComponent(c -> c.cleanStart()) ;
+            return ;
+        }
+        
+        log.info("Journal recovery start") ;
+        components.forEachComponent(c -> c.startRecovery()) ;
+        
+        // Group to commit
+        
+        List<JournalEntry> entries = new ArrayList<>() ;
+        
+        iter.forEachRemaining( entry -> {
+            switch(entry.getType()) {
+                case ABORT :
+                    entries.clear() ;
+                    break ;
+                case COMMIT :
+                    recover(entries) ;
+                    entries.clear() ;
+                    break ;
+                case REDO : case UNDO :
+                    entries.add(entry) ;
+                    break ;
+            }
+        }) ;
+    
+        components.forEachComponent(c -> c.finishRecovery()) ;
+        journal.reset() ;
+        log.info("Journal recovery end") ;
+    }
+
+    private void recover(List<JournalEntry> entries) {
+        entries.forEach(e -> {
+            if ( e.getType() == UNDO ) {
+                Log.warn(TransactionCoordinator.this, "UNDO entry : not handled") ;  
+                return ;
+            }
+            ComponentId cid = e.getComponentId() ;
+            ByteBuffer bb = e.getByteBuffer() ;
+            // find component.
+            TransactionalComponent c = components.findComponent(cid) ;
+            if ( c == null ) {
+                Log.warn(TransactionCoordinator.this, "No component for "+cid) ;
+                return ;
+            }
+            c.recover(bb); 
+        }) ;
+    }
+
+    public void setTxnIdGenerator(TxnIdGenerator generator) {
+        this.txnIdGenerator = generator ;
+    }
+    
+    public Journal getJournal() {
+        return journal ;
+    }
+    
+    public TransactionCoordinatorState detach(Transaction txn) {
+        txn.detach();
+        TransactionCoordinatorState coordinatorState = new TransactionCoordinatorState(txn) ;
+        components.forEach((id, c) -> {
+            SysTransState s = c.detach() ;
+            coordinatorState.componentStates.put(id, s) ;
+        } ) ;
+        // The txn still counts as "active" for tracking purposes below.
+        return coordinatorState ;
+    }
+
+    public void attach(TransactionCoordinatorState coordinatorState) {
+        Transaction txn = coordinatorState.transaction ;
+        txn.attach() ;
+        coordinatorState.componentStates.forEach((id, obj) -> {
+            components.findComponent(id).attach(obj);
+        });
+    }
+
+    public void shutdown() {
+        if ( coordinatorLock == null )
+            return ;
+        components.forEach((id, c) -> c.shutdown()) ;
+        shutdownHooks.forEach((h)-> h.shutdown()) ;
+        coordinatorLock = null ;
+        journal.close(); 
+    }
+
+    // Are we in the initialization phase?
+    private void checkSetup() {
+        if ( coordinatorStarted )
+            throw new TransactionException("TransactionCoordinator has already been started") ;
+    }
+
+    // Are we up and ruuning?
+    private void checkActive() {
+        if ( ! coordinatorStarted )
+            throw new TransactionException("TransactionCoordinator has not been started") ;
+        checkNotShutdown();
+    }
+
+    // Check not wrapped up
+    private void checkNotShutdown() {
+        if ( coordinatorLock == null )
+            throw new TransactionException("TransactionCoordinator has been shutdown") ;
+    }
+
+    private void releaseWriterLock() {
+        int x = writersWaiting.availablePermits() ;
+        if ( x != 0 )
+            throw new TransactionException("TransactionCoordinator: Probably mismatch of enable/disableWriter calls") ;
+        writersWaiting.release() ;
+    }
+    
+    /** Acquire the writer lock - return true if succeeded */
+    private boolean acquireWriterLock(boolean canBlock) {
+        if ( ! canBlock )
+            return writersWaiting.tryAcquire() ;
+        try { 
+            writersWaiting.acquire() ; 
+            return true;
+        } catch (InterruptedException e) { throw new TransactionException(e) ; }
+    }
+    
+    /** Enter exclusive mode; block if necessary.
+     * There are no active transactions on return; new transactions will be held up in 'begin'.
+     * Return to normal (release waiting transactions, allow new transactions)
+     * with {@link #finishExclusiveMode}.
+     * <p>
+     * Do not call inside an existing transaction.
+     */
+    public void startExclusiveMode() {
+        startExclusiveMode(true);
+    }
+    
+    /** Try to enter exclusive mode. 
+     *  If return is true, then there are no active transactions on return and new transactions will be held up in 'begin'.
+     *  If false, there were in-progress transactions.
+     *  Return to normal (release waiting transactions, allow new transactions)
+     *  with {@link #finishExclusiveMode}.   
+     * <p>
+     * Do not call inside an existing transaction.
+     */
+    public boolean tryExclusiveMode() {
+        return tryExclusiveMode(false);
+    }
+    
+    /** Try to enter exclusive mode.  
+     *  If return is true, then there are no active transactions on return and new transactions will be held up in 'begin'.
+     *  If false, there were in-progress transactions.
+     *  Return to normal (release waiting transactions, allow new transactions)
+     *  with {@link #finishExclusiveMode}.   
+     * <p>
+     * Do not call inside an existing transaction.
+     * @param canBlock Allow the operation block and wait for the exclusive mode lock.
+     */
+    public boolean tryExclusiveMode(boolean canBlock) {
+        return startExclusiveMode(canBlock);
+    }
+
+    private boolean startExclusiveMode(boolean canBlock) {
+        if ( canBlock ) {
+            exclusivitylock.writeLock().lock() ;
+            return true ;
+        }
+        return exclusivitylock.writeLock().tryLock() ;
+    }
+
+    /** Return to normal (release waiting transactions, allow new transactions).
+     * Must be paired with an earlier {@link #startExclusiveMode}. 
+     */
+    public void finishExclusiveMode() {
+        exclusivitylock.writeLock().unlock() ;
+    }
+
+    /** Execute an action in exclusive mode.  This method can block.
+     * Equivalent to:
+     * <pre>
+     *  startExclusiveMode() ;
+     *  try { action.run(); }
+     *  finally { finishExclusiveMode(); }
+     * </pre>
+     * 
+     * @param action
+     */
+    public void execExclusive(Runnable action) {
+        startExclusiveMode() ;
+        try { action.run(); }
+        finally { finishExclusiveMode(); }
+    }
+    
+    /** Block until no writers are active.
+     *  When this returns, this guarantees that the database is not changing
+     *  and the journal is flushed to disk.
+     * <p> 
+     * The application must call {@link #enableWriters} later.
+     * <p> 
+     * This operation must not be nested (it will block).
+     * 
+     * @see #tryBlockWriters()
+     * @see #enableWriters()
+     * 
+     */
+    public void blockWriters() {
+        acquireWriterLock(true) ;
+    }
+
+    /** Try to block all writers, or return if can't at the moment.
+     * <p>
+     * Unlike a write transction, there is no associated transaction. 
+     * <p>
+     * If it returns true, the application must call {@link #enableWriters} later.
+     *  
+     * @see #blockWriters()
+     * @see #enableWriters()
+
+     * @return true if the operation succeeded and writers are blocked 
+     */
+    public boolean tryBlockWriters() {
+        return tryBlockWriters(false) ;
+    }
+
+    /**
+     * Block until no writers are active, optionally blocking or returning if can't at the moment.
+     * <p>
+     * Unlike a write transction, there is no associated transaction. 
+     * <p>
+     * If it returns true, the application must call {@link #enableWriters} later.
+     * @param canBlock
+     * @return true if the operation succeeded and writers are blocked
+     */
+    public boolean tryBlockWriters(boolean canBlock) {
+        return acquireWriterLock(canBlock) ;
+    }
+    /** Allow writers.  
+     * This must be used in conjunction with {@link #blockWriters()} or {@link #tryBlockWriters()}
+     * 
+     * @see #blockWriters()
+     * @see #tryBlockWriters()
+     */ 
+    public void enableWriters() {
+        releaseWriterLock();
+    }
+    
+    /** Execute an action in as if a Write but no write transaction started.
+     * This method can block.
+     * <p>
+     * Equivalent to:
+     * <pre>
+     *  blockWriters() ;
+     *  try { action.run(); }
+     *  finally { enableWriters(); }
+     * </pre>
+     * 
+     * @param action
+     */
+    public void execAsWriter(Runnable action) {
+        blockWriters() ;
+        try { action.run(); }
+        finally { enableWriters(); }
+    }
+    
+    /** Start a transaction. This may block. */
+    public Transaction begin(ReadWrite readWrite) {
+        return begin(readWrite, true) ;
+    }
+    
+    /** 
+     * Start a transaction.  Returns null if this operation would block.
+     * Readers can start at any time.
+     * A single writer policy is currently imposed so a "begin(WRITE)"
+     * may block.  
+     */
+    public Transaction begin(ReadWrite readWrite, boolean canBlock) {
+        Objects.nonNull(readWrite) ;
+        checkActive() ;
+        
+        // XXX Flag to bounce writers fpor long term "block writers"
+        if ( false /* bounceWritersAtTheMoment */) {
+            if ( readWrite == WRITE ) {
+                throw new TransactionException("Writers currently being rejected");
+            }
+        }
+        
+        if ( canBlock )
+            exclusivitylock.readLock().lock() ;
+        else {
+            if ( ! exclusivitylock.readLock().tryLock() )
+                return null ;
+        }
+        
+        // Readers never block.
+        if ( readWrite == WRITE ) {
+            // Writers take a WRITE permit from the semaphore to ensure there
+            // is at most one active writer, else the attempt to start the
+            // transaction blocks.
+            // Released by in notifyCommitFinish/notifyAbortFinish
+            boolean b = acquireWriterLock(canBlock) ;
+            if ( !b ) {
+                exclusivitylock.readLock().unlock() ;
+                return null ;
+            }
+        }
+        Transaction transaction = begin$(readWrite) ;
+        startActiveTransaction(transaction) ;
+        transaction.begin();
+        return transaction;
+    }
+    
+    // The version is the serialization point for a transaction.
+    // All transactions on the same view of the data get the same serialization point.
+    
+    // A read transaction can be promoted if writer does not start
+    // This TransactionCoordinator provides Serializable, Read-lock-free
+    // execution.  With no item locking, a read can only be promoted
+    // if no writer started since the reader started.
+
+    /* The version of the data - incremented when transaction commits.
+     * This is the version with repest to the last commited transaction.
+     * Aborts do not cause the data version to advance. 
+     * This counterr never goes backwards.
+     */ 
+    private final AtomicLong dataVersion = new AtomicLong(0) ;
+    
+    private Transaction begin$(ReadWrite readWrite) {
+        synchronized(coordinatorLock) {
+            // Thread safe part of 'begin'
+            // Allocate the transaction serialization point.
+            TxnId txnId = txnIdGenerator.generate() ;
+            List<SysTrans> sysTransList = new ArrayList<>() ;
+            Transaction transaction = new Transaction(this, txnId, readWrite, dataVersion.get(), sysTransList) ;
+            
+            ComponentGroup txnComponents = chooseComponents(this.components, readWrite) ;
+            
+            try {
+                txnComponents.forEachComponent(elt -> {
+                    SysTrans sysTrans = new SysTrans(elt, transaction, txnId) ;
+                    sysTransList.add(sysTrans) ; }) ;
+                // Calling each component must be inside the lock
+                // so that a transaction does not commit overlapping with setup.
+                // If it did, different components might end up starting from
+                // different start states of the overall system.
+                txnComponents.forEachComponent(elt -> elt.begin(transaction)) ;
+            } catch(Throwable ex) {
+                // Careful about incomplete.
+                //abort() ;
+                //complete() ;
+                throw ex ;
+            }
+            return transaction ;
+        }
+    }
+    
+    private ComponentGroup chooseComponents(ComponentGroup components, ReadWrite readWrite) {
+        if ( quorumGenerator == null )
+            return components ;
+        ComponentGroup cg = quorumGenerator.genQuorum(readWrite) ;
+        if ( cg == null )
+            return components ;
+        cg.forEach((id, c) -> {
+            TransactionalComponent tcx = components.findComponent(id) ;
+            if ( ! tcx.equals(c) )
+                log.warn("TransactionalComponent not in TransactionCoordinator's ComponentGroup") ; 
+        }) ;
+        if ( log.isDebugEnabled() )
+            log.debug("Custom ComponentGroup for transaction "+readWrite+": size="+cg.size()+" of "+components.size()) ;
+        return cg ;
+    }
+
+    /** Is promotion of transactions enabled? */ 
+    /*private*/public/*for development*/ static boolean promotion               = true ;
+    
+    /** Control of whether a transaction promotion can see any commits that
+     *  happened between this transaction starting and it promoting.
+     *  A form of "ReadCommitted".   
+     */
+    /*private*/public/*for development*/ static boolean readCommittedPromotion  = false ;
+    
+    /** Whether to wait for writers when trying to promote */
+    private static final boolean promotionWaitForWriters = true;
+
+    /** Attempt to promote a tranasaction from READ to WRITE.
+     * No-op for a transaction already a writer.
+     * Throws {@link TransactionException} if the promotion
+     * can not be done.
+     */
+    /*package*/ boolean promoteTxn(Transaction transaction) {
+        if ( ! promotion )
+            return false;
+
+        if ( transaction.getMode() == WRITE )
+            return true ;
+        
+        // Has there been an writer active since the transaction started?
+        // Do a test outside the lock - only dataVaersion can change and that increases.
+        // If "read commited transactions" not allowed, the data has changed in a way we
+        // do no twish to expose.
+        // If this test fails outside the lock it will fail inside.
+        // If it passes, we have to test again in case there is an active writer.
+        
+        if ( ! readCommittedPromotion ) {
+            long txnEpoch = transaction.getDataVersion() ;      // The transaction-start point.
+            long currentEpoch = dataVersion.get() ;             // The data serialization point.
+            
+            if ( txnEpoch < currentEpoch )
+                // The data has changed and "read committed" not allowed.
+                // We can reject now.
+                return false ;
+        }
+        
+        // Once we have acquireWriterLock, we are single writer.
+        // We may have to discard writer status because eocne we can make the defintite
+        // decision on promotion, we find we can't promote after all.
+        if ( readCommittedPromotion ) {
+            /*
+             * acquireWriterLock(true) ;
+             * synchronized(coordinatorLock) {
+             * begin$ ==>
+             *    reset transaction.
+             *    promote components
+             *    reset dataVersion
+             */
+            acquireWriterLock(true) ;
+            synchronized(coordinatorLock) {
+                try { 
+                    transaction.promoteComponents() ;
+                    // Because we want to see the new state of the data.s
+                    //transaction.resetDataVersion(dataVersion.get());
+                } catch (TransactionException ex) {
+                    try { transaction.abort(); } catch(RuntimeException ex2) {}
+                    releaseWriterLock();
+                    return false ;
+                }
+            }
+            return true;
+        }
+        
+        if ( ! waitForWriters() )
+            // Failed to become a writer.
+            return false;
+        // Now a proto-writer.
+        
+        synchronized(coordinatorLock) {
+            // Not read commited.
+            // Need to check the data version once we are the writer and all previous
+            // writers have commited or aborted.
+            // Has there been an writer active since the transaction started?
+            long txnEpoch = transaction.getDataVersion() ;    // The transaction-start point.
+            long currentEpoch = dataVersion.get() ;         // The data serialization point.
+
+            if ( txnEpoch != currentEpoch ) {
+                // Failed to promote.
+                releaseWriterLock();
+                return false ;
+            }
+            
+            // ... we have now got the writer lock ...
+            try { 
+                transaction.promoteComponents() ;
+                // No need to reset the data version because strict isolation. 
+            } catch (TransactionException ex) {
+                try { transaction.abort(); } catch(RuntimeException ex2) {}
+                releaseWriterLock();
+                return false ;
+            }
+        }
+        return true ;
+    }
+        
+    private boolean waitForWriters() {
+        if ( promotionWaitForWriters )
+            return acquireWriterLock(true) ;
+        else
+            return acquireWriterLock(false) ;
+    }
+
+    // Called once by Transaction after the action of commit()/abort() or end()
+    /** Signal that the transaction has finished. */  
+    /*package*/ void completed(Transaction transaction) {
+        finishActiveTransaction(transaction);
+        journal.reset() ;
+    }
+
+    /*package*/ void executePrepare(Transaction transaction) {
+        // Do here because it needs access to the journal.
+        notifyPrepareStart(transaction);
+        transaction.getComponents().forEach(sysTrans -> {
+            TransactionalComponent c = sysTrans.getComponent() ;
+            ByteBuffer data = c.commitPrepare(transaction) ;
+            if ( data != null ) {
+                PrepareState s = new PrepareState(c.getComponentId(), data) ;
+                journal.write(s) ;
+            }
+        }) ;
+        notifyPrepareFinish(transaction);
+    }
+
+    /*package*/ void executeCommit(Transaction transaction,  Runnable commit, Runnable finish) {
+        // This is the commit point. 
+        synchronized(coordinatorLock) {
+            // *** COMMIT POINT
+            journal.sync() ;
+            // *** COMMIT POINT
+            // Now run the Transactions commit actions. 
+            commit.run() ;
+            journal.truncate(0) ;
+            // and tell the Transaction it's finished. 
+            finish.run() ;
+            // Bump global serialization point if necessary.
+            if ( transaction.getMode() == WRITE )
+                advanceDataVersion() ;
+            notifyCommitFinish(transaction) ;
+        }
+    }
+    
+    // Inside the global transaction start/commit lock.
+    private void advanceDataVersion() {
+        dataVersion.incrementAndGet();
+    }
+    
+    /*package*/ void executeAbort(Transaction transaction, Runnable abort) {
+        notifyAbortStart(transaction) ;
+        abort.run();
+        notifyAbortFinish(transaction) ;
+    }
+    
+    // Active transactions: this is (the missing) ConcurrentHashSet
+    private final static Object dummy                   = new Object() ;    
+    private ConcurrentHashMap<Transaction, Object> activeTransactions = new ConcurrentHashMap<>() ;
+    private AtomicLong activeTransactionCount = new AtomicLong(0) ;
+    private AtomicLong activeReadersCount = new AtomicLong(0) ;
+    private AtomicLong activeWritersCount = new AtomicLong(0) ;
+    
+    private void startActiveTransaction(Transaction transaction) {
+        synchronized(coordinatorLock) {
+            // Use lock to ensure all the counters move together.
+            // Thread safe - we have not let the Transaction object out yet.
+            countBegin.incrementAndGet() ;
+            switch(transaction.getMode()) {
+                case READ:  countBeginRead.incrementAndGet() ;  activeReadersCount.incrementAndGet() ; break ;
+                case WRITE: countBeginWrite.incrementAndGet() ; activeWritersCount.incrementAndGet() ; break ;
+            }
+            activeTransactionCount.incrementAndGet() ;
+            activeTransactions.put(transaction, dummy) ;
+        }
+    }
+    
+    private void finishActiveTransaction(Transaction transaction) {
+        synchronized(coordinatorLock) {
+            // Idempotent.
+            Object x = activeTransactions.remove(transaction) ;
+            if ( x == null )
+                return ;
+            countFinished.incrementAndGet() ;
+            activeTransactionCount.decrementAndGet() ;
+            switch(transaction.getMode()) {
+                case READ:  activeReadersCount.decrementAndGet() ; break ;
+                case WRITE: activeWritersCount.decrementAndGet() ; break ;
+            }
+        }
+        exclusivitylock.readLock().unlock() ; 
+    }
+    
+    public long countActiveReaders()    { return activeReadersCount.get() ; } 
+    public long countActiveWriter()     { return activeWritersCount.get() ; } 
+    public long countActive()           { return activeTransactionCount.get(); }
+    
+    // notify*Start/Finish called round each transaction lifecycle step
+    // Called in cooperation between Transaction and TransactionCoordinator
+    // depending on who is actually do the work of each step.
+
+    /*package*/ void notifyPrepareStart(Transaction transaction) {}
+
+    /*package*/ void notifyPrepareFinish(Transaction transaction) {}
+
+    // Writers released here - can happen because of commit() or abort(). 
+
+    private void notifyCommitStart(Transaction transaction) {}
+    
+    private void notifyCommitFinish(Transaction transaction) {
+        if ( transaction.getMode() == WRITE )
+            releaseWriterLock();
+    }
+    
+    private void notifyAbortStart(Transaction transaction) { }
+    
+    private void notifyAbortFinish(Transaction transaction) {
+        if ( transaction.getMode() == WRITE )
+            releaseWriterLock();
+    }
+
+    /*package*/ void notifyEndStart(Transaction transaction) { }
+
+    /*package*/ void notifyEndFinish(Transaction transaction) {}
+
+    // Called by Transaction once at the end of first commit()/abort() or end()
+    
+    /*package*/ void notifyCompleteStart(Transaction transaction) { }
+
+    /*package*/ void notifyCompleteFinish(Transaction transaction) { }
+
+    // Coordinator state.
+    private final AtomicLong countBegin         = new AtomicLong(0) ;
+
+    private final AtomicLong countBeginRead     = new AtomicLong(0) ;
+
+    private final AtomicLong countBeginWrite    = new AtomicLong(0) ;
+
+    private final AtomicLong countFinished      = new AtomicLong(0) ;
+
+    // Access counters
+    public long countBegin()        { return countBegin.get() ; }
+
+    public long countBeginRead()    { return countBeginRead.get() ; }
+
+    public long countBeginWrite()   { return countBeginWrite.get() ; }
+
+    public long countFinished()     { return countFinished.get() ; }
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinatorState.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinatorState.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinatorState.java
new file mode 100644
index 0000000..7402cc1
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionCoordinatorState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+import java.util.HashMap ;
+import java.util.Map ;
+
+public class TransactionCoordinatorState {
+    /*package*/final Transaction transaction ;
+    /*package*/Map<ComponentId, SysTransState> componentStates = new HashMap<>();
+    /*package*/ TransactionCoordinatorState(Transaction transaction) {
+        this.transaction = transaction ;
+    }
+    
+    public Transaction getTransaction() {
+        return transaction ;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionException.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionException.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionException.java
new file mode 100644
index 0000000..f15a22b
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+import org.apache.jena.sparql.JenaTransactionException ;
+
+public class TransactionException extends JenaTransactionException {
+    public TransactionException()                                  { super(); }
+    public TransactionException(String message)                    { super(message); }
+    public TransactionException(Throwable cause)                   { super(cause) ; }
+    public TransactionException(String message, Throwable cause)   { super(message, cause) ; }
+
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionInfo.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionInfo.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionInfo.java
new file mode 100644
index 0000000..a0a9483
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionInfo.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+import static org.seaborne.dboe.transaction.txn.TxnState.INACTIVE ;
+import org.apache.jena.query.ReadWrite ;
+
+/** 
+ * A view that provides information about a transaction
+ * @see Transaction
+ */
+public interface TransactionInfo {
+
+    /** The transaction lifecycle state */
+    public TxnState getState() ;
+
+    /**
+     * Each transaction is allocated a serialization point by the transaction
+     * coordinator. Normally, this is related to this number and it increases
+     * over time as the data changes. Two readers can have the same
+     * serialization point - they are working with the same view of the data.
+     */
+    public long getDataVersion() ;
+
+    /** Has the transaction started? */ 
+    public boolean hasStarted() ;
+    
+    /** Has the transaction finished (has commit/abort/end been called)? */ 
+    public boolean hasFinished() ; 
+
+    /** Has the transaction gone through all lifecycle states? */ 
+    public boolean hasFinalised() ; 
+
+    /** Get the trasnaction id for this transaction. Unique within this OS process (JVM) at least . */
+    public TxnId getTxnId() ; 
+
+    /** What mode is this transaction?
+     *  This may change from {@code READ} to {@code WRITE} in a transactions lifetime.  
+     */
+    public ReadWrite getMode() ;
+    
+    /** Is this a view of a READ transaction?
+     * Convenience operation equivalent to {@code (getMode() == READ)}
+     */
+    public default boolean isReadTxn()  { return getMode() == ReadWrite.READ ; }
+    
+    /** Is this a view of a WRITE transaction?
+     * Convenience operation equivalent to {@code (getMode() == WRITE)}
+     */
+    public default boolean isWriteTxn()  { return getMode() == ReadWrite.WRITE ; }
+    
+    /** Is this a view of a transaction that is active?
+     * Equivalent to {@code getState() != INACTIVE} 
+     */
+    public default boolean isActiveTxn() { 
+        return getState() != INACTIVE ;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalBase.java
new file mode 100644
index 0000000..0e88c57
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalBase.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+import java.util.Objects ;
+
+import org.apache.jena.atlas.logging.Log ;
+import org.apache.jena.query.ReadWrite ;
+
+/**
+ * Framework for implementing a Transactional.
+ */
+
+public class TransactionalBase implements TransactionalSystem {
+    // Help debugging by generating names for Transactionals.
+    private final String label ; 
+    protected boolean isShutdown = false ; 
+    protected final TransactionCoordinator txnMgr ;
+    
+    // Per thread transaction.
+    private final ThreadLocal<Transaction> theTxn = new ThreadLocal<>() ;
+    
+    public TransactionalBase(String label, TransactionCoordinator txnMgr) {
+        this.label = label ;
+        this.txnMgr = txnMgr ;
+    }
+    
+    public TransactionalBase(TransactionCoordinator txnMgr) {
+        this(null, txnMgr) ;
+    }
+
+    @Override
+    public TransactionCoordinator getTxnMgr() {
+        return txnMgr ;
+    }
+
+    // Development
+    private static final boolean trackAttachDetach = false ;
+    
+    @Override
+    public TransactionCoordinatorState detach() {
+        if ( trackAttachDetach )
+            Log.info(this,  ">> detach");
+        checkRunning() ;
+        // Not if it just commited but before end.
+        //checkActive() ;
+        Transaction txn = theTxn.get() ;
+        TransactionCoordinatorState coordinatorState = null ;
+        if ( txn != null )
+            // We are not ending.
+            coordinatorState = txnMgr.detach(txn) ;
+        if ( trackAttachDetach )
+            Log.info(this,  "  theTxn = "+txn) ;
+        theTxn.remove() ; ///??????
+        if ( trackAttachDetach )
+            Log.info(this,  "<< detach");
+        if ( coordinatorState == null )
+            throw new TransactionException("Not attached") ;
+        return coordinatorState ;
+    }
+    
+    @Override
+    public void attach(TransactionCoordinatorState coordinatorState) {
+        if ( trackAttachDetach )
+            Log.info(this,  ">> attach");
+        Objects.nonNull(coordinatorState) ;
+        checkRunning() ;
+        checkNotActive() ;
+        TxnState txnState = coordinatorState.transaction.getState() ;
+        if ( txnState != TxnState.DETACHED )
+            throw new TransactionException("Not a detached transaction") ;
+        txnMgr.attach(coordinatorState) ;
+        if ( trackAttachDetach )
+            Log.info(this,  "  theTxn = "+coordinatorState.transaction) ;
+        theTxn.set(coordinatorState.transaction);
+        if ( trackAttachDetach )
+            Log.info(this,  "<< attach");
+    } 
+    
+    @Override
+    public final void begin(ReadWrite readWrite) {
+        Objects.nonNull(readWrite) ;
+        checkRunning() ;
+        checkNotActive() ;
+        Transaction transaction = txnMgr.begin(readWrite) ;
+        theTxn.set(transaction) ;
+    }
+    
+    @Override
+    public boolean promote() {
+        checkActive() ;
+        Transaction txn = getValidTransaction() ;
+        return txn.promote() ;
+    }
+
+    @Override
+    public final void commit() {
+        checkRunning() ;
+        TransactionalSystem.super.commit() ;
+    }
+
+    @Override
+    public void commitPrepare() {
+        Transaction txn = getValidTransaction() ;
+        txn.prepare() ;
+    }
+
+    @Override
+    public void commitExec() {
+        Transaction txn = getValidTransaction() ;
+        txn.commit() ;
+        _end() ;
+    }
+
+//    /** Signal end of commit phase */
+//    @Override
+//    public void commitEnd() {
+//        _end() ;
+//    }
+    
+    @Override
+    public final void abort() {
+        checkRunning() ;
+        checkActive() ;
+        Transaction txn = getValidTransaction() ;
+        try { txn.abort() ; }
+        finally { _end() ; }
+    }
+
+    @Override
+    public final void end() {
+        checkRunning() ;
+        // Don't check if active or if any thread locals exist
+        // because this may have already been called.
+        // txn.get() ; -- may be null -- test repeat calls.
+        _end() ;
+    }
+
+    /**
+     * Return the Read/write state (or null when not in a transaction)
+     */
+    @Override
+    final
+    public ReadWrite getState() {
+        checkRunning() ;
+        // tricky - touching theTxn causes it to initialize.
+        Transaction txn = theTxn.get() ;
+        if ( txn != null )
+            return txn.getMode() ;
+        theTxn.remove() ;
+        return null ; 
+    }
+    
+    @Override
+    final
+    public TransactionInfo getTransactionInfo() {
+        return getThreadTransaction() ;
+    }
+    
+    @Override
+    final
+    public Transaction getThreadTransaction() {
+        Transaction txn = theTxn.get() ;
+        // Touched the thread local so it is defined now.
+//        if ( txn == null )
+//            theTxn.remove() ;
+        return txn ;
+    }
+
+    /** Get the transaction, checking there is one */  
+    private Transaction getValidTransaction() {
+        Transaction txn = theTxn.get() ;
+        if ( txn == null )
+            throw new TransactionException("Not in a transaction") ;
+        return txn ;
+    }
+
+    private void checkRunning() {
+//        if ( ! hasStarted )
+//            throw new TransactionException("Not started") ;
+        
+        if ( isShutdown )
+            throw new TransactionException("Shutdown") ;
+    }
+    
+    /**
+     * Shutdown component, aborting any in-progress transactions. This operation
+     * is not guaranteed to be called.
+     */
+    public void shutdown() {
+        txnMgr.shutdown() ;
+        isShutdown = true ;
+    }
+
+    protected String label(String msg) {
+        if ( label == null )
+            return msg ;
+        return label+": "+msg ;
+    }
+    
+    final
+    protected void checkActive() {
+        checkNotShutdown() ;
+        if ( ! isInTransaction() )
+            throw new TransactionException(label("Not in an active transaction")) ;
+    }
+
+    final
+    protected void checkNotActive() {
+        checkNotShutdown() ;
+        if ( isInTransaction() )
+            throw new TransactionException(label("Currently in an active transaction")) ;
+    }
+
+    final
+    protected void checkNotShutdown() {
+        if ( isShutdown )
+            throw new TransactionException(label("Already shutdown")) ;
+    }
+
+    private final void _end() {
+        Transaction txn = theTxn.get() ;
+        if ( txn != null ) {
+            try {
+                // Can throw an exception on begin(W)...end().
+                txn.end() ;
+            } finally {
+                theTxn.set(null) ;
+                theTxn.remove() ;
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponent.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponent.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponent.java
new file mode 100644
index 0000000..5057ae6
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponent.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+
+/** Interface that for components of a transaction system.
+* <p><br/>
+* The {@link TransactionCoordinator} manages a number of components
+* which provide the {@link TransactionalComponent} interface.
+* <p><br/>
+* When a new coordinator starts, typically being when the in-process system starts,
+* there is a recovery phase when work from a previous coordinator is recovered.
+* Transactions were either were properly committed by the previous coordinator,
+* and hence redo actions (finalization) should be done,
+* or they were not, in which case undo actions may be needed.
+* Transctions to discard are not notified, only fully commited transaction are
+* notified during recovery. The component may need to keepit's own record of
+* undo actions needed across restarts.
+* <p><br/>
+* Lifecycle of startup:
+* <ul>
+* <li>{@link #startRecovery}
+* <li>{@link #recover} for each commited/durable transaction (redo actions)
+* <li>{@link #finishRecovery}, discarding any othe transactions (undo actions).
+* </ul>
+* <p><br/>
+* Lifecycle of a read transaction:
+* <ul>
+* <li>{@link #begin}
+* <li>{@link #complete}
+* </ul>
+* <br/>
+* A read transaction may also include {@code commit} or {@code abort} lifecycles.
+* {@link #commitPrepare} and {@link #commitEnd} are not called.
+*<p><br/>
+* Lifecycle of a write transaction:
+* <li>{@link #begin}
+* <li>{@link #commitPrepare}
+* <li>{@link #commit} or {@link #abort}
+* <li>{@link #commitEnd}
+* <li>{@link #complete} including abort
+* </ul>
+* <br/>
+* or if the application aborts the transaction:
+* <ul>
+* <li>{@link #begin}
+* <li>{@link #abort}
+* <li>{@link #complete}
+* </ul>
+* <p>
+* {@link #complete} may be called out of sequence and it forces an abort if before 
+* {@link #commitPrepare}. Once {@link #commitPrepare} has been called, the component
+* can not decide whether to commit finally or to cause a system abort; it must wait 
+* for the coordinator. After {@link #commitEnd}, the coordinator has definitely 
+* commited the overall transaction and local prepared state can be released, and changes
+* made to the permanent state of the component.
+*
+* @see Transaction
+* @see TransactionCoordinator
+*/
+
+public interface TransactionalComponent
+{
+    /**
+     * Every component <i>instance</i> must supplied a unique number.
+     * It is used to route journal entries to subsystems, including across restarts/recovery. 
+     * Uniqueness scope is within the same {@link TransactionCoordinator},
+     * and the same across restarts.  
+     * <p>
+     * If a component imposes the rule of one-per-{@link TransactionCoordinator},
+     * the same number can be used (if different from all other component type instances).
+     * <p>
+     * If a component can have multiple instances per {@link TransactionCoordinator},
+     * for example indexes, each must have a unique instance id. 
+     */
+    public ComponentId getComponentId() ;
+
+    // ---- Recovery phase
+    public void startRecovery() ;
+    
+    /** Notification that {@code ref} was really committed and is being recovered.
+     *  
+     * @param ref Same bytes as were written during prepare originally.
+     */
+    public void recover(ByteBuffer ref) ;
+    
+    /** End of the receovery phase */
+    public void finishRecovery() ;
+
+    /** Indicate that no recovery is being done (the journal thinks everything was completed last time) */
+    public void cleanStart() ;
+
+    // ---- Normal operation
+    
+    /** Start a transaction; return an identifier for this components use. */ 
+    public void begin(Transaction transaction) ;
+    
+    /** Promote a component in a transaction.
+     * <p>
+     *  May return "false" for "can't do that" if the transaction can not be promoted.
+     *  <p>
+     *  May throw {@link UnsupportedOperationException} if promotion is not supported.
+     */
+    public boolean promote(Transaction transaction) ;
+
+    /** Prepare for a commit.
+     *  Returns some bytes that will be written to the journal.
+     *  The journal remains valid until {@link #commitEnd} is called.
+     */
+    public ByteBuffer commitPrepare(Transaction transaction) ;
+
+    /** Commit a transaction (make durable).
+     * Other components not have been commited yet and recovery may occur still.
+     * Permanent state should not be finalised until {@link #commitEnd}.
+     */
+    public void commit(Transaction transaction) ;
+    
+    /** Signal all commits on all components are done (the component can clearup now) */  
+    public void commitEnd(Transaction transaction) ;
+
+    /** Abort a transaction (undo the effect of a transaction) */   
+    public void abort(Transaction transaction) ;
+
+    /** Finalization - the coordinator will not mention the transaction again
+     *  although recovery after a crash may do so.
+     */
+    public void complete(Transaction transaction) ;
+    
+    // ---- End of operations
+    
+    /** Detach this component from the transaction of the current thread
+     * and return some internal state that can be used in a future call of 
+     * {@link #attach(SysTransState)}
+     * <p>
+     * After this call, the component is not in a transaction but the
+     * existing transaction still exists. The thread may start a new
+     * transaction; that transaction is completely independent of the
+     * detached transaction.
+     * <p>
+     * Returns {@code null} if the current thread not in a transaction.
+     * The component may return null to indicate it has no state. 
+     * The return system state should be used in a call to {@link #attach(SysTransState)}
+     * and the transaction ended in the usual way. 
+     *   
+     */
+    public SysTransState detach() ;
+    
+    /** Set the current thread to be in the transaction.  The {@code systemState}
+     * must be obtained from a call of {@link #detach()}.
+     * This method can only be called once per {@code systemState}.
+     */
+    public void attach(SysTransState systemState) ;
+    
+    /** Shutdown component, aborting any in-progress transactions.
+     * This operation is not guaranteed to be called.
+     */
+    public void shutdown() ;
+    
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentBase.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentBase.java
new file mode 100644
index 0000000..8dc8530
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentBase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+
+import org.apache.jena.query.ReadWrite ;
+
+/**
+ * A transaction component that does nothing - can be used as a helper for
+ * management tasks hooked into the transaction component lifecycle but which
+ * are not stateful across restarts.
+ */
+public class TransactionalComponentBase<X> extends TransactionalComponentLifecycle<X> {
+    
+    public TransactionalComponentBase(ComponentId id) {
+        super(id) ;
+    }
+    
+    @Override
+    public void startRecovery() {}
+
+    @Override
+    public void recover(ByteBuffer ref) {}
+
+    @Override
+    public void finishRecovery() {}
+    
+    @Override 
+    public void cleanStart() {}
+
+    @Override
+    protected X _begin(ReadWrite readWrite, TxnId txnId) {
+        return null ;
+    }
+
+    @Override
+    protected ByteBuffer _commitPrepare(TxnId txnId, X state) {
+        return null ;
+    }
+
+    @Override
+    protected void _commit(TxnId txnId, X state) {}
+
+    @Override
+    protected void _commitEnd(TxnId txnId, X state) {}
+
+    @Override
+    protected void _abort(TxnId txnId, X state) {}
+
+    @Override
+    protected void _complete(TxnId txnId, X state) {}
+
+    @Override
+    protected void _shutdown() {}
+
+    @Override
+    protected X _promote(TxnId txnId, X state) { return null; }
+
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentLifecycle.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentLifecycle.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentLifecycle.java
new file mode 100644
index 0000000..f4aeb53
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentLifecycle.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+import static org.seaborne.dboe.transaction.txn.TxnState.* ;
+
+import java.nio.ByteBuffer ;
+import java.util.Objects ;
+
+import org.apache.jena.query.ReadWrite ;
+
+import org.apache.jena.atlas.lib.InternalErrorException ;
+
+/** 
+ * Base implementation of the component interface for {@link TransactionalComponent}.
+ */
+public abstract class TransactionalComponentLifecycle<X> implements TransactionalComponent {
+    
+    // Pass down recovery operations.
+    // This class has no transaction-recorded state.
+    @Override public abstract void startRecovery();
+    @Override public abstract void recover(ByteBuffer ref);
+    @Override public abstract void finishRecovery();
+    
+    // ---- Normal operation
+    
+    private static final boolean CHECKING = false ;
+    private ThreadLocal<TxnState> trackTxn = CHECKING ? ThreadLocal.withInitial(() -> INACTIVE) : null ; 
+    
+    // Access to these two must be via the getter/setters below only.
+    // Allows stuff for thread switching.
+    private ThreadLocal<Transaction> threadTxn = new ThreadLocal<>() ;
+    private ThreadLocal<X> componentState = new ThreadLocal<>() ;
+    private final ComponentId componentId ;
+    
+    protected TransactionalComponentLifecycle(ComponentId componentId) {
+        this.componentId = componentId ;
+    }
+    
+    @Override
+    public ComponentId getComponentId() {
+        return componentId ;
+    }
+
+//    // Very dangerous!
+//    protected void setForThread(Transaction txn, X state) {
+//        threadTxn.set(txn);
+//        componentState.set(state);
+//    }
+    
+    /** Start a transaction */
+    @Override
+    final
+    public void begin(Transaction transaction) {
+        Objects.requireNonNull(transaction) ;
+        setTransaction(transaction); 
+        checkState(INACTIVE, COMMITTED, ABORTED) ;
+        setTrackTxn(ACTIVE);
+        X x = _begin(transaction.getMode(), transaction.getTxnId()) ;
+        setDataState(x);
+    }
+    
+    /** Promote a component in a transaction */
+    @Override
+    final
+    public boolean promote(Transaction transaction) {
+        Objects.requireNonNull(transaction) ;
+        checkState(ACTIVE) ;
+        X newState = _promote(transaction.getTxnId(), getDataState());
+        if ( newState == null )
+            return false;
+        setDataState(newState);
+        return true;
+    }
+    
+    /** Commit a transaction (make durable): prepare - commit - cleanup */  
+    @Override
+    final
+    public ByteBuffer commitPrepare(Transaction transaction) {
+        checkAligned(transaction) ;
+        checkState(ACTIVE) ;
+        try { return _commitPrepare(transaction.getTxnId(), getDataState()) ; }
+        finally { setTrackTxn(PREPARE) ; }
+    }
+
+    @Override
+    final
+    public void commit(Transaction transaction) {
+        checkAligned(transaction) ;
+        checkState(PREPARE) ;
+        _commit(transaction.getTxnId(), getDataState());
+        setTrackTxn(COMMIT) ;
+    }
+    
+    @Override
+    final
+    public void commitEnd(Transaction transaction) {
+        checkAligned(transaction) ;
+        checkState(COMMIT) ;
+        _commitEnd(transaction.getTxnId(), getDataState());
+        setTrackTxn(COMMITTED) ;
+        internalComplete(transaction) ;
+    }
+
+    @Override 
+    final
+    public void abort(Transaction transaction) {
+        checkAligned(transaction) ;
+        checkState(ACTIVE, PREPARE, COMMIT) ;
+        _abort(transaction.getTxnId(), getDataState()) ;
+        setTrackTxn(ABORTED) ;
+        internalComplete(transaction) ;
+    }
+
+    private void internalComplete(Transaction transaction) {
+        _complete(transaction.getTxnId(), getDataState());
+        setTrackTxn(INACTIVE) ;
+        releaseThreadState() ;
+    }
+
+    @Override
+    final
+    public void complete(Transaction transaction) {
+        if ( transaction.hasFinished() )
+            return ;
+        checkAligned(transaction) ;
+        ReadWrite m = getReadWriteMode() ;
+        switch(m) {
+            case READ:
+                checkState(ACTIVE, COMMITTED, ABORTED) ;
+                break ;
+            case WRITE:
+                // If bad, force abort?
+                checkState(COMMITTED, ABORTED) ; 
+                break ;
+        }
+        _complete(transaction.getTxnId(), getDataState());
+        switch(m) {
+            case READ:
+                internalComplete(transaction);
+                break ;
+            case WRITE:
+                // complete happened in the commit or abort.
+                break ;
+        }
+    }
+    
+    @Override 
+    final
+    public void shutdown() {
+        _shutdown() ;
+        clearInternal() ;
+    }
+    
+    @Override 
+    final public SysTransState detach() {
+        TxnState txnState = getTxnState() ;
+        if ( txnState == null )
+            return null ;
+        checkState(ACTIVE) ;
+        setTrackTxn(DETACHED) ;
+        SysTransState transState = new SysTransState(this, getTransaction(), getDataState()) ;
+        //****** Thread locals 
+        releaseThreadState() ;
+        return transState ;
+    }
+    
+    @Override
+    public void attach(SysTransState state) {
+        @SuppressWarnings("unchecked")
+        X x = (X)state.getState() ;
+//        // reset to not thread not in 
+//        if ( CHECKING )
+//            trackTxn : ThreadLocal<TxnState>
+//        
+//      
+        setTransaction(state.getTransaction());
+        setDataState(x);
+        setTrackTxn(ACTIVE) ;
+    }
+    
+    // -- Access object members.
+
+    public static class ComponentState<X> {
+        final TxnState state;
+        final Transaction txn ;
+        final X componentState ;
+        ComponentState(TxnState state, Transaction txn, X componentState) {
+            super() ;
+            this.state = state ;
+            this.txn = txn ;
+            this.componentState = componentState ;
+        }
+    }
+    
+    public ComponentState<X> getComponentState() {
+        return new ComponentState<>(getTrackTxn(), getTransaction(), getDataState()) ; 
+    }
+    
+    public void setComponentState(ComponentState<X> state) {
+        setTrackTxn(state.state);
+        setTransaction(state.txn);
+        setDataState(state.componentState);
+        
+    }
+
+    protected void releaseThreadState() {
+        // Remove thread locals
+        if ( trackTxn != null )
+            trackTxn.remove() ;
+        componentState.remove();
+        
+//        java version "1.8.0_31"
+//        Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
+//        Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)
+//        
+//        openjdk version "1.8.0_40-internal"
+//        OpenJDK Runtime Environment (build 1.8.0_40-internal-b09)
+//        OpenJDK 64-Bit Server VM (build 25.40-b13, mixed mode)
+        
+        // This one is very important else the memory usage grows.  Not clear why.
+        // A Transaction has an internal AtomicReference.  Replacing the AtomicReference
+        // with a plain member variable slows the growth down greatly.
+        threadTxn.remove() ;
+    }
+    
+    protected void clearInternal() {
+        trackTxn = null ;
+        threadTxn = null ;
+        componentState = null ;
+    }
+    
+    //protected ComponentState<X> getState()
+    
+    protected X getDataState()                      { return componentState.get() ; }
+    protected void setDataState(X data)             { componentState.set(data) ; }
+
+    protected Transaction getTransaction()          { return threadTxn.get(); }
+    protected void setTransaction(Transaction txn)  { threadTxn.set(txn); }
+    
+    private void setTrackTxn(TxnState newState) {
+        if ( ! CHECKING ) return ;
+        trackTxn.set(newState);
+    }
+
+    // This is our record of the state - it is not necessarily the transactions
+    // view during changes.  This class tracks the expected incomign state and
+    // the transaction
+    private TxnState getTrackTxn() {
+        if ( ! CHECKING ) return null ;
+        return trackTxn.get();
+    }
+    // -- Access object members.
+    
+    // XXX Align to javadoc in TransactionalComponent.
+    
+    /* There are two lifecycles, one for write transaction, one
+     * for read transactions. This affects how transaction end so
+     * when/if promoted read->write transactions happen, a promoted
+     * transaction will follow the write lifecycle. 
+     * 
+     * In both lifecyles, the implementer can assume that calls
+     * happen at the right points and called only as needed.  Framework
+     * takes care of checking.  
+     * 
+     * Read lifecycle:
+     * A read transaction be be just begin(READ)-end() but may also
+     * have commit or abort before end. The _commitRead and _abortRead
+     * calls note if an explicit commit or abort occurs but may not be
+     * called. _endRead is always called exactly once.
+     *  
+     * _commitRead
+     * _abortRead
+     * _endRead
+     * _complete
+     * 
+     * Write lifecycle:
+     * A write transaction must have a commit() or abort() before end().
+     * The fraemwork will check this.
+     * 
+     * If the transaction commits:
+     * _commitPrepareWrite
+     * _commitWrite -- The transaction is 
+     * _commitEndWrite
+     * 
+     * If the transaction aborts:
+     * _abortWrite
+     * 
+     * After any lifecycle, a final call of
+     * _complete()
+     * 
+     * indicates ths transaction has fully finished.
+     * 
+     * Typically, an implementation does not need to take action in every call. 
+     */
+    
+//    /**
+//     * 
+//     * @param readWrite
+//     * @param txnId
+//     * @return
+//     */
+//    protected abstract X           _begin(ReadWrite readWrite, TxnId txnId) ;
+//    
+//    /**
+//     * 
+//     * @param txnId
+//     * @param state
+//     * @return
+//     */
+//    protected abstract ByteBuffer  _commitPrepareWrite(TxnId txnId, X  state) ;
+//    
+//    /**
+//     * 
+//     * @param txnId
+//     * @param state
+//     */
+//    protected abstract void        _commitWrite(TxnId txnId, X state) ;
+//    
+//    /**
+//     * 
+//     * @param txnId
+//     * @param state
+//     */
+//    protected abstract void        _commitEndWrite(TxnId txnId, X state) ;
+//    
+//    /**
+//     * 
+//     * @param txnId
+//     * @param state
+//     */
+//    protected abstract void        _abortWrite(TxnId txnId, X state) ;
+//    
+//    /**
+//     * 
+//     * @param txnId
+//     * @param state
+//     * @return
+//     */
+//    protected abstract ByteBuffer  _commitRead(TxnId txnId, X  state) ;
+//    
+//    /**
+//     * 
+//     * @param txnId
+//     * @param state
+//     * @return
+//     */
+//    protected abstract ByteBuffer  _abortRead(TxnId txnId, X  state) ;
+//
+//    /**
+//     * 
+//     * @param txnId
+//     * @param state
+//     */
+//    protected abstract void        _complete(TxnId txnId, X state) ;
+//    
+//    /**
+//     * 
+//     */
+//    protected abstract void        _shutdown() ;
+
+    protected abstract X           _begin(ReadWrite readWrite, TxnId txnId) ;
+    protected abstract X           _promote(TxnId txnId, X oldState) ;
+    protected abstract ByteBuffer  _commitPrepare(TxnId txnId, X  state) ;
+    protected abstract void        _commit(TxnId txnId, X state) ;
+    protected abstract void        _commitEnd(TxnId txnId, X state) ;
+    protected abstract void        _abort(TxnId txnId, X state) ;
+    protected abstract void        _complete(TxnId txnId, X state) ;
+    protected abstract void        _shutdown() ;
+    
+    protected ReadWrite getReadWriteMode() {
+        Transaction txn = getTransaction() ;
+        return txn.getMode() ;
+    }
+    
+    protected boolean isActiveTxn() {
+        TxnState txnState = getTxnState() ;
+        if ( txnState == null )
+            return false ; 
+        switch(getTxnState()) {
+            case INACTIVE: case END_ABORTED: case END_COMMITTED: 
+                return false ;
+            case ACTIVE: case DETACHED: case PREPARE: case ABORTED: case COMMIT: case COMMITTED:  
+                return true ;
+            //null: default: return false ;
+            // Get the compiler to check all states covered.
+        }
+        // Should not happen.
+        throw new InternalErrorException("Unclear transaction state") ;
+    }
+
+    protected boolean isReadTxn() { return ! isWriteTxn() ; }
+    
+    protected boolean isWriteTxn() {
+        Transaction txn = getTransaction();
+        return txn.isWriteTxn() ;
+    }
+    
+    protected void checkTxn() {
+        if ( ! isActiveTxn() )
+            throw new TransactionException("Not in a transaction") ;
+    }
+
+//    protected void requireWriteTxn() {
+//        Transaction txn = getTransaction();
+//        if ( txn == null )
+//            throw new TransactionException("Not a transaction");
+//        else 
+//            txn.requireWriteTxn() ;
+//    }
+    
+    protected void checkWriteTxn() {
+        Transaction txn = getTransaction();
+        if ( txn == null )
+            throw new TransactionException("Not a transaction");
+        else 
+            txn.requireWriteTxn() ;
+    }
+
+    // -- Access object members.
+    
+    private TxnState getTxnState() { 
+        Transaction txn = getTransaction() ;
+        if ( txn == null )
+            return null ;
+        return txn.getState() ;
+    }
+
+    private void checkAligned(Transaction transaction) {
+        if ( ! CHECKING ) return ;
+        Transaction txn = getTransaction();
+        if ( txn != transaction )
+            throw new TransactionException("Transaction is not the transaction of the thread") ; 
+    }
+    
+    private void checkState(TxnState expected) {
+        if ( ! CHECKING ) return ;
+        TxnState s = getTrackTxn() ;
+        if ( s != expected )
+            throw new TransactionException("Transaction is in state "+s+": expected state "+expected) ;
+    }
+
+    private void checkState(TxnState expected1, TxnState expected2) {
+        if ( ! CHECKING ) return ;
+        TxnState s = getTrackTxn() ;
+        if ( s != expected1 && s != expected2 )
+            throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+" or "+expected2) ;
+    }
+
+    // Avoid varargs ... undue worry?
+    private void checkState(TxnState expected1, TxnState expected2, TxnState expected3) {
+        if ( ! CHECKING ) return ;
+        TxnState s = getTrackTxn() ;
+        if ( s != expected1 && s != expected2 && s != expected3 )
+            throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+", "+expected2+" or "+expected3) ;
+    }
+
+    //    private void checkStateNot(State unexpected) {
+//        State s = state.get();
+//        if ( s == unexpected )
+//            throw new TransactionException("Transaction in unexpected state "+s) ;
+//    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentWrapper.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentWrapper.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentWrapper.java
new file mode 100644
index 0000000..a07852e
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalComponentWrapper.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn ;
+
+import java.nio.ByteBuffer ;
+
+public class TransactionalComponentWrapper implements TransactionalComponent {
+    protected final TransactionalComponent other ;
+
+    public TransactionalComponentWrapper(TransactionalComponent other) {
+        this.other = other ;
+    }
+
+    @Override
+    public void startRecovery() {
+        other.startRecovery() ;
+    }
+
+    @Override
+    public void recover(ByteBuffer ref) {
+        other.recover(ref) ;
+    }
+
+    @Override
+    public void finishRecovery() {
+        other.finishRecovery() ;
+    }
+    
+    @Override
+    public void cleanStart() {
+        other.cleanStart() ;
+    }
+
+    @Override
+    public ComponentId getComponentId() {
+        return other.getComponentId() ;
+    }
+
+    @Override
+    public void begin(Transaction transaction) {
+        other.begin(transaction) ;
+    }
+
+    @Override
+    public boolean promote(Transaction transaction) {
+        return other.promote(transaction) ;
+    }
+
+    @Override
+    public ByteBuffer commitPrepare(Transaction transaction) {
+        return other.commitPrepare(transaction) ;
+    }
+
+    @Override
+    public void commit(Transaction transaction) {
+        other.commit(transaction) ;
+    }
+
+    @Override
+    public void commitEnd(Transaction transaction) {
+        other.commitEnd(transaction) ;
+    }
+
+    @Override
+    public void abort(Transaction transaction) {
+        other.abort(transaction) ;
+    }
+
+    @Override
+    public void complete(Transaction transaction) {
+        other.complete(transaction) ;
+    }
+
+    @Override
+    public SysTransState detach() {
+        return other.detach() ;
+    }
+
+    @Override
+    public void attach(SysTransState systemState) {
+        other.attach(systemState) ;
+    }
+
+    @Override
+    public void shutdown() {
+        other.shutdown() ;
+    }
+
+    @Override 
+    public String toString() { return "W:"+other.toString() ; }
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalMRSW.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalMRSW.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalMRSW.java
new file mode 100644
index 0000000..920ee05
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalMRSW.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+import java.nio.ByteBuffer ;
+import java.util.concurrent.locks.Lock ;
+import java.util.concurrent.locks.ReadWriteLock ;
+import java.util.concurrent.locks.ReentrantReadWriteLock ;
+
+import org.apache.jena.atlas.logging.Log ;
+
+import org.apache.jena.query.ReadWrite ;
+
+/** Implementation of the component interface for {@link TransactionalComponent}.
+ *  Useful for in-memory transactions that do not provide durability or abort (undo). 
+ *  When retro fitting to other systems, that may be the best that can be done. 
+ */
+public class TransactionalMRSW extends TransactionalComponentLifecycle<Object> {
+    // MRSW implementation of TransactionMVCC
+    // XXX Update to Jena style TransactionalLock
+    private ReadWriteLock lock = new ReentrantReadWriteLock() ;
+    
+    public TransactionalMRSW(ComponentId componentId) {
+        super(componentId) ;
+    }
+
+    // ---- Recovery phase
+    @Override
+    public void startRecovery() {}
+    
+    @Override
+    public void recover(ByteBuffer ref) {
+        Log.warn(this, "Called to recover a transaction (ignored)") ; 
+    }
+
+    @Override
+    public void finishRecovery() { }
+    
+    @Override 
+    public void cleanStart() {}
+    
+    private Lock getLock() {
+        return ( ReadWrite.WRITE.equals(getReadWriteMode()) ) ? lock.writeLock() : lock.readLock() ;
+    }
+    
+    @Override
+    protected Object _begin(ReadWrite readWrite, TxnId thisTxnId) {
+        Lock lock = getLock() ;
+        // This is the point that makes this MRSW (readers OR writer), not MR+SW (readers and a writer)
+        lock.lock();
+        if ( isWriteTxn() )
+            startWriteTxn(); 
+        else 
+            startReadTxn(); 
+        return createState();                    
+    }
+
+    private Object createState() {
+        return new Object();
+    }
+    
+    @Override
+    protected Object _promote(TxnId txnId, Object state) {
+        // We have a read lock, the transaction coordinator has said 
+        // it's OK (from it's point-of-view) to promote so this should succeed.
+        // We have a read lock - theer are no other writers.
+        boolean b = lock.writeLock().tryLock();
+        if ( ! b ) {
+            Log.warn(this, "Failed to promote");  
+            return false;
+        }
+        lock.readLock().unlock(); 
+        return createState(); 
+    }
+
+    // Checks.
+    
+    protected void startReadTxn()   {}
+    protected void startWriteTxn()  {}
+    protected void finishReadTxn()  {}
+    protected void finishWriteTxn() {}
+
+    @Override
+    protected ByteBuffer _commitPrepare(TxnId txnId, Object obj) {
+        return null ;
+    }
+
+    @Override
+    protected void _commit(TxnId txnId, Object obj) {
+        clearup() ;
+    }
+
+    @Override
+    protected void _commitEnd(TxnId txnId, Object obj) {
+        clearup() ;
+    }
+
+    @Override
+    protected void _abort(TxnId txnId, Object obj) {
+        clearup() ;
+    }
+
+    @Override
+    protected void _complete(TxnId txnId, Object obj) {
+    }
+
+    @Override
+    protected void _shutdown() {
+        lock = null ;
+    }
+
+    private void clearup() {
+        Lock lock = getLock() ;
+        if ( isWriteTxn() )
+            finishWriteTxn(); 
+        else 
+            finishReadTxn(); 
+        lock.unlock(); 
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalSystem.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalSystem.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalSystem.java
new file mode 100644
index 0000000..9b64799
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TransactionalSystem.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+import org.apache.jena.query.ReadWrite ;
+import org.seaborne.dboe.transaction.Transactional ;
+
+/** Implementation side of a {@link Transactional}.
+ *  {@link Transactional} presents the application facing view
+ *  whereas this has all the possible steps of an implementation.  
+ *  Normally, the implementation of {@link #commit} is split up.
+ */
+public interface TransactionalSystem extends Transactional {
+    
+    @Override
+    public default void commit() {
+        commitPrepare() ;
+        commitExec() ;
+    }
+    
+    /** Do the 2-phase "prepare" step after which
+     *  the transaction coordinator decides whether to commit
+     *  or abort.  A TransactionalSystem must be prepared for
+     *  both possibilities.   
+     */
+    public void commitPrepare();
+
+    /** Do the 2-phase "commit" step */
+    public void commitExec();
+
+    /** Suspend this transaction, detaching from the current thread.
+     * A new transaction on this thread can performed but the detached
+     * transaction still exists and if it is a write transaction
+     * it can still block other write transactions.
+     */
+    public TransactionCoordinatorState detach() ;
+    
+    /** 
+     * Attach a transaction to this thread.
+     * A transaction system implementation usually imposes a rule that 
+     * only one thread can have a transaction attached at a time. 
+     */
+    public void attach(TransactionCoordinatorState coordinatorState) ;
+
+    /** Get the associated {@link TransactionCoordinator} */
+    public TransactionCoordinator getTxnMgr() ; 
+    
+    /**  Return the Read/Write state from the point of view of the caller.
+     * Return null when not in a transaction.
+     */
+    public ReadWrite getState() ;
+    
+    @Override
+    public default boolean isInTransaction() { return getState() != null ; }  
+    
+    /** Return an information view of the transaction for this thread, if any.
+     *  Returns null when there is no active transaction for this tread. 
+     */
+    public TransactionInfo getTransactionInfo() ;
+    
+    /** Return the transaction object for this thread.  
+     *  Low-level use only.
+     *  To get information about the current transaction, call {@link #getTransactionInfo}.
+     */
+    public Transaction getThreadTransaction() ;
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/1dabea3a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TxnId.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TxnId.java b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TxnId.java
new file mode 100644
index 0000000..e05f084
--- /dev/null
+++ b/jena-db/jena-dboe-transaction/src/main/java/org/seaborne/dboe/transaction/txn/TxnId.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.seaborne.dboe.transaction.txn;
+
+
+/**
+ * {@code TxnId} is a identfier for a transaction.
+ * A component in a transaction can use it as a unique key.
+ * The {@code TxnId}
+ * <ul>
+ * <li>must be unique across a JVM run
+ * <li>unique across JVm runs if used as a persistent name
+ * <li>Must provide value equality semantics (two {@code TxnId} are {@code .equals}
+ * if 
+ * </ul>
+ * <p>
+ * It is preferrable that the TxnId is global unique over time and space.
+ */
+public interface TxnId {
+//    public static TxnId create() { return TxnIdSimple.create() ; }
+//    
+//    public static TxnId create(byte[] bytes) {
+//        switch(bytes.length) {
+//            case 8:  return TxnIdSimple.create(bytes) ;
+//            case 16: return TxnIdUuid.create(bytes) ;
+//            default:
+//                throw new TransactionException("TxnId bytes unrecognized: length="+bytes.length) ;
+//        }
+//    }
+//    
+    // Reminder to implement.
+    @Override
+    public int hashCode() ;
+    @Override
+    public boolean equals(Object other) ;
+    
+    public String name() ;
+    public byte[] bytes() ;
+    /**  A long that is a subset, or all or, the bytes.
+     * This should be unique for the lifetime of the transaction and
+     * ideally unique per system instance. It is not a persistent record
+     * of a transaction, it is for a transaction identifier in running code.
+     * ("system" maybe larger than on e JVM).    
+     */
+    public long runtime() ;
+    
+}
+


Mime
View raw message