jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject svn commit: r1152650 - in /incubator/jena/Experimental/TxTDB/trunk: src-dev/tx/ src/main/java/com/hp/hpl/jena/tdb/ src/main/java/com/hp/hpl/jena/tdb/transaction/ src/test/java/com/hp/hpl/jena/tdb/transaction/
Date Sun, 31 Jul 2011 21:35:52 GMT
Author: andy
Date: Sun Jul 31 21:35:48 2011
New Revision: 1152650

URL: http://svn.apache.org/viewvc?rev=1152650&view=rev
Log:
Bug fix for dealing with delayed transaction queue under heavy load.

Removed:
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/TxnState.java
Modified:
    incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
    incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
    incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java

Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java?rev=1152650&r1=1152649&r2=1152650&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java (original)
+++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java Sun Jul 31 21:35:48 2011
@@ -7,6 +7,11 @@ public class DevTx
     // Seen:
     //  TestTransSystem - writer inconsistencies?
     
+    // * Version per dataset
+    //   Can flush chnages quietly when all up to X are done.
+    //   Wilsee in transaction stack, not base database. 
+    //   Need to be able to swap/drop trnastion layer.
+    
     // Tasks:
     // * Check journal truncates to last commit.
     //   Journal needs reset markers

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java?rev=1152650&r1=1152649&r2=1152650&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
(original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java
Sun Jul 31 21:35:48 2011
@@ -229,7 +229,7 @@ public class JournalControl
         replay(journal, dsg) ;
     }
     
-    private static void replay(Journal journal, DatasetGraphTDB dsg)
+    public static void replay(Journal journal, DatasetGraphTDB dsg)
     {
         journal.position(0) ;
         dsg.getLock().enterCriticalSection(Lock.WRITE) ;

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java?rev=1152650&r1=1152649&r2=1152650&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
(original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java
Sun Jul 31 21:35:48 2011
@@ -174,13 +174,13 @@ public class Transaction
     public void removeIterator(Iterator<?> iter)    { iterators.remove(iter) ; }
     public List<Iterator<?>> iterators()            { return Collections.unmodifiableList(iterators)
; }
     
-    public Iterator<Transactional> components()
+    public List<Transactional> components()
     {
         // FIX NEEDED
         List<Transactional> x = new ArrayList<Transactional>() ;
         x.addAll(nodeTableTrans) ;
         x.addAll(blkMgrs) ;
-        return x.iterator() ;
+        return x ;
     }
     
     public Journal getJournal()    { return journal ; }

Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1152650&r1=1152649&r2=1152650&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
(original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java
Sun Jul 31 21:35:48 2011
@@ -7,19 +7,20 @@
 package com.hp.hpl.jena.tdb.transaction;
 
 import static com.hp.hpl.jena.tdb.ReadWrite.READ ;
+import static com.hp.hpl.jena.tdb.transaction.TransactionManager.TxnPoint.* ;
 import static com.hp.hpl.jena.tdb.ReadWrite.WRITE ;
 import static com.hp.hpl.jena.tdb.sys.SystemTDB.syslog ;
 import static java.lang.String.format ;
 
 import java.util.ArrayList ;
 import java.util.HashSet ;
-import java.util.Iterator ;
 import java.util.List ;
 import java.util.Set ;
 import java.util.concurrent.BlockingQueue ;
 import java.util.concurrent.LinkedBlockingDeque ;
 import java.util.concurrent.Semaphore ;
 
+import org.openjena.atlas.lib.Pair ;
 import org.openjena.atlas.logging.Log ;
 import org.slf4j.Logger ;
 import org.slf4j.LoggerFactory ;
@@ -42,13 +43,18 @@ public class TransactionManager
     // consume too much memory.
     
     // Make a feature of the transaction.
+    // Chnage to one list of (txn, state change.).
     private boolean recordHistory = false ;
-    private List<Transaction> transactionBegin ;
-    private List<Transaction> transactionEnd ;          // All transactions.
-    private List<Transaction> transactionQueued ;       // Writes, delayed finalization
-    private List<Transaction> transactionUnqueue ;         // Reads,that flushed the
delayed queue
-//    private List<Transaction> transactionCommit ;       // Write transaction.
-//    private List<Transaction> transactionAbort ;        // Write transaction.
+    
+    enum TxnPoint { BEGIN, COMMIT, ABORT, CLOSE, QUEUE, UNQUEUE }
+    private List<Pair<Transaction, TxnPoint>> transactionStateTransition ;
+    
+    private void record(Transaction txn, TxnPoint state)
+    {
+        if ( ! recordHistory ) return ;
+        initRecordingState() ;
+        transactionStateTransition.add(new Pair<Transaction, TxnPoint>(txn, state))
;
+    }
     
     // Transactions that have commited (and the journal is written) but haven't
     // writted back to the main database. 
@@ -94,6 +100,7 @@ public class TransactionManager
     
     private interface TSM
     {
+        // Quert unqueue?
         void transactionStarts(Transaction txn) ;
         void transactionFinishes(Transaction txn) ;
         void transactionCloses(Transaction txn) ;
@@ -149,9 +156,9 @@ public class TransactionManager
     {
         // Later - record on one list the state transition.
         @Override
-        public void transactionStarts(Transaction txn)      { if ( recordHistory ) transactionBegin.add(txn)
; }
+        public void transactionStarts(Transaction txn)      { record(txn, BEGIN) ; }
         @Override
-        public void transactionFinishes(Transaction txn)    { if ( recordHistory ) transactionEnd.add(txn)
; }
+        public void transactionFinishes(Transaction txn)    { record(txn, CLOSE) ; }
     }
     
     private TSM[] actions = new TSM[] { 
@@ -199,6 +206,7 @@ public class TransactionManager
                 throw new TDBTransactionException(e) ;
             }
         }
+        // entry synchronized part
         return begin$(mode, label) ;
     }
         
@@ -214,9 +222,10 @@ public class TransactionManager
 //            //   create new transaction 
 //        }
         
-        
         if ( mode == WRITE && activeWriters > 0 )    // Guard
             throw new TDBTransactionException("Existing active write transaction") ;
+
+        // Even flush queue here.
         
         DatasetGraphTDB dsg = baseDataset ;
         // *** But, if there are pending, committed transactions, use latest.
@@ -226,12 +235,11 @@ public class TransactionManager
         Transaction txn = createTransaction(dsg, mode, label) ;
         DatasetGraphTxn dsgTxn = (DatasetGraphTxn)new DatasetBuilderTxn(this).build(txn,
mode, dsg) ;
         txn.setActiveDataset(dsgTxn) ;
-        
-        Iterator<Transactional> iter = dsgTxn.getTransaction().components() ;
-        
+
+        // TODO Match with other oepration states
         // Notify everyone we're starting.
-        for ( ; iter.hasNext() ; )
-            iter.next().begin(dsgTxn.getTransaction()) ;
+        for ( Transactional component : dsgTxn.getTransaction().components() )
+            component.begin(dsgTxn.getTransaction()) ;
 
         noteStartTxn(txn) ;
         log("begin",txn) ;
@@ -266,6 +274,7 @@ public class TransactionManager
                     // messey - combine with state machine. 
                     processDelayedReplayQueue(transaction) ;
                     enactTransaction(transaction) ;
+                    JournalControl.replay(transaction) ;
                 }
                 else
                 {
@@ -304,17 +313,13 @@ public class TransactionManager
     /** The stage in a commit after commiting - make the changes permanent in the base data
*/ 
     private void enactTransaction(Transaction transaction)
     {
-        // Flush the queue first.
+        // Flush the queue first.  Happens in Transaction.commit
         // Really, really do it!
-        Iterator<Transactional> iter = transaction.components() ;
-        for ( ; iter.hasNext() ; )
+        for ( Transactional x : transaction.components() )
         {
-            Transactional x = iter.next() ;
             x.commitEnact(transaction) ;
             x.commitClearup(transaction) ;
         }
-        // This cleans up the journal as well.
-        JournalControl.replay(transaction) ;
     }
 
     private void processDelayedReplayQueue(Transaction txn)
@@ -328,18 +333,16 @@ public class TransactionManager
                 if ( log() ) log(format("Pending transactions: R=%d / W=%d", activeReaders,
activeWriters), txn) ;
             return ;
         }
+//        if ( queue.size() > 1 )
+//            System.out.println("\nQuery length: "+queue.size()) ;
         while ( queue.size() > 0 )
         {
-            if ( recordHistory )
-                transactionUnqueue.add(txn) ;
-            
             // Currently, replay is replay everything
             // so looping on a per-transaction basis is
             // pointless but harmless.  
             
             try {
                 Transaction txn2 = queue.take() ;
-               
                 if ( txn2.getMode() == READ )
                     continue ;
                 log("Flush delayed commit", txn2) ;
@@ -348,9 +351,14 @@ public class TransactionManager
                 // **** Related NodeFileTrans: writes at "prepare" 
                 enactTransaction(txn2) ;
                 commitedAwaitingFlush.remove(txn2) ;
+                
+                // Drain queue - in fact, everything is done by one "enactTransaction"
+                
             } catch (InterruptedException ex)
             { Log.fatal(this, "Interruped!", ex) ; }
         }
+        // Whole journal to base database
+        JournalControl.replay(txn.getJournal(), baseDataset) ;
     }
 
     synchronized
@@ -371,12 +379,12 @@ public class TransactionManager
     // TODO Collapse these.
     private void noteStartTxn(Transaction transaction)
     {
-        transactionStarts(transaction) ;
         switch (transaction.getMode())
         {
             case READ : readerStarts(transaction) ; break ;
             case WRITE : writerStarts(transaction) ; break ;
         }
+        transactionStarts(transaction) ;
     }
 
     private void noteTxnCommit(Transaction transaction)
@@ -419,23 +427,13 @@ public class TransactionManager
     public void clearRecordingState()
     {
         initRecordingState() ;
-
-        transactionBegin.clear() ;
-        transactionEnd.clear() ;
-        transactionQueued.clear() ;
-        transactionUnqueue.clear() ;
+        transactionStateTransition.clear() ;
     }
     
     private void initRecordingState()
     {
-        if ( transactionBegin == null )
-            transactionBegin = new ArrayList<Transaction>() ;
-        if ( transactionEnd == null )
-            transactionEnd = new ArrayList<Transaction>() ;
-        if ( transactionQueued == null )
-            transactionQueued = new ArrayList<Transaction>() ;
-        if ( transactionUnqueue == null )
-            transactionUnqueue = new ArrayList<Transaction>() ;
+        if ( transactionStateTransition == null )
+            transactionStateTransition = new ArrayList<Pair<Transaction, TxnPoint>>()
;
     }
     
 //    public List<Transaction> getBeginTransactionRecord() { return transactionBegin
; }

Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java?rev=1152650&r1=1152649&r2=1152650&view=diff
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
(original)
+++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java
Sun Jul 31 21:35:48 2011
@@ -66,7 +66,7 @@ public class TestTransSystem
     static final int readerSeqRepeats       = 5 ;    
     static final int readerMaxPause         = 50 ;
     
-    static final int writerAbortSeqRepeats  = 2 ;
+    static final int writerAbortSeqRepeats  = 0 ;
     static final int writerCommitSeqRepeats = 5 ;
     static final int writerMaxPause         = 20 ;
     
@@ -87,7 +87,8 @@ public class TestTransSystem
         if ( ! progress )
             System.out.println("START") ;
         
-        final int N = (Iterations < 10) ? 1 : Iterations / 10 ;
+        int N = (Iterations < 10) ? 1 : Iterations / 10 ;
+        N = Math.min(N, 100) ;
         int i ;
         
         for ( i = 0 ; i < Iterations ; i++ )



Mime
View raw message