Return-Path: X-Original-To: apmail-incubator-jena-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-jena-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 88CF76E9A for ; Wed, 27 Jul 2011 21:03:28 +0000 (UTC) Received: (qmail 42391 invoked by uid 500); 27 Jul 2011 21:03:28 -0000 Delivered-To: apmail-incubator-jena-commits-archive@incubator.apache.org Received: (qmail 42373 invoked by uid 500); 27 Jul 2011 21:03:28 -0000 Mailing-List: contact jena-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jena-dev@incubator.apache.org Delivered-To: mailing list jena-commits@incubator.apache.org Received: (qmail 42366 invoked by uid 99); 27 Jul 2011 21:03:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jul 2011 21:03:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jul 2011 21:03:25 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C678123888CE; Wed, 27 Jul 2011 21:03:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1151627 - in /incubator/jena/Experimental/TxTDB/trunk: ./ resources2/ src/main/java/com/hp/hpl/jena/tdb/transaction/ src/test/java/com/hp/hpl/jena/tdb/transaction/ Date: Wed, 27 Jul 2011 21:03:05 -0000 To: jena-commits@incubator.apache.org From: andy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110727210305.C678123888CE@eris.apache.org> Author: andy Date: Wed Jul 27 21:03:03 2011 New Revision: 1151627 URL: http://svn.apache.org/viewvc?rev=1151627&view=rev Log: (empty) Modified: incubator/jena/Experimental/TxTDB/trunk/log4j.properties incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java Modified: incubator/jena/Experimental/TxTDB/trunk/log4j.properties URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/log4j.properties?rev=1151627&r1=1151626&r2=1151627&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/log4j.properties (original) +++ incubator/jena/Experimental/TxTDB/trunk/log4j.properties Wed Jul 27 21:03:03 2011 @@ -1,7 +1,7 @@ log4j.rootLogger=INFO, stdlog log4j.appender.stdlog=org.apache.log4j.ConsoleAppender -## log4j.appender.stdlog.target=System.err +log4j.appender.stdlog.target=System.out log4j.appender.stdlog.layout=org.apache.log4j.PatternLayout log4j.appender.stdlog.layout.ConversionPattern=%d{HH:mm:ss} %-5p %-25c{1} :: %m%n @@ -17,8 +17,11 @@ log4j.logger.com.hp.hpl.jena=WARN log4j.logger.org.openjena.riot=INFO # TDB +# TDB syslog. +log4j.logger.TDB=INFO + log4j.logger.com.hp.hpl.jena.tdb=INFO -#log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL +log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL # Joseki server log4j.logger.org.joseki=INFO Modified: incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties?rev=1151627&r1=1151626&r2=1151627&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties (original) +++ incubator/jena/Experimental/TxTDB/trunk/resources2/log4j.properties Wed Jul 27 21:03:03 2011 @@ -16,5 +16,9 @@ log4j.logger.com.hp.hpl.jena.tdb.loader= log4j.logger.com.hp.hpl.jena=WARN log4j.logger.org.openjena.riot=INFO +# TDB +log4j.logger.com.hp.hpl.jena.tdb=INFO +log4j.logger.com.hp.hpl.jena.tdb.transaction=ALL + # Joseki server log4j.logger.org.joseki=INFO Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java?rev=1151627&r1=1151626&r2=1151627&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java Wed Jul 27 21:03:03 2011 @@ -60,12 +60,12 @@ public class JournalControl // Do we need to recover? Journal journal = findJournal(dsg) ; - if ( journal != null ) - { - for ( FileRef fileRef : dsg.getConfig().nodeTables.keySet() ) - recoverNodeDat(dsg, fileRef) ; - recoverSystemJournal(journal, dsg) ; - } + if ( journal == null ) + return ; + + for ( FileRef fileRef : dsg.getConfig().nodeTables.keySet() ) + recoverNodeDat(dsg, fileRef) ; + recoverSystemJournal(journal, dsg) ; // Recovery complete. Tidy up. Node journal files have already been handled. if ( journal.getFilename() != null ) Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1151627&r1=1151626&r2=1151627&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Wed Jul 27 21:03:03 2011 @@ -43,13 +43,30 @@ public class TransactionManager static long transactionId = 1 ; - private int readers = 0 ; - private int writers = 0 ; // 0 or 1 + private int activeReaders = 0 ; + private int activeWriters = 0 ; // 0 or 1 // Misc stats private int finishedReads = 0 ; private int committedWrite = 0 ; private int abortedWrite = 0 ; + + public static class State + { + final public int activeReaders ; + final public int activeWriters ; + final public int finishedReads ; + final public int committedWrite ; + final public int abortedWrite ; + State(TransactionManager tm) + { + activeReaders = tm.activeReaders ; + activeWriters = tm.activeWriters ; + finishedReads = tm.finishedReads ; + committedWrite = tm.committedWrite ; + abortedWrite = tm.abortedWrite ; + } + } private BlockingQueue queue = new LinkedBlockingDeque() ; @@ -97,9 +114,9 @@ public class TransactionManager // } switch (mode) { - case READ : readers++ ; break ; + case READ : activeReaders++ ; break ; case WRITE : - int x = writers++ ; + int x = activeWriters++ ; if ( x > 0 ) throw new TDBTransactionException("Existing active write transaction") ; break ; @@ -140,21 +157,25 @@ public class TransactionManager endTransaction(transaction) ; - if ( transaction.getMode() == ReadWrite.WRITE ) + switch ( transaction.getMode() ) { - if ( readers == 0 ) - // Can commit imemdiately. - commitTransaction(transaction) ; - else - { - // Can't make permentent at the moment. - commitedAwaitingFlush.add(transaction) ; - //log.debug("Commit pending: "+transaction.getLabel()); - - //if ( log.isDebugEnabled() ) - // log.debug("Commit blocked at the moment") ; - queue.add(transaction) ; - } + case READ: + endOfRead(transaction) ; + break ; + case WRITE: + if ( activeReaders == 0 ) + // Can commit imemdiately. + commitTransaction(transaction) ; + else + { + // Can't make permanent at the moment. + commitedAwaitingFlush.add(transaction) ; + log.debug("Commit flush: "+transaction.getLabel()); + //if ( log.isDebugEnabled() ) + // log.debug("Commit blocked at the moment") ; + queue.add(transaction) ; + } + committedWrite ++ ; } } @@ -179,9 +200,51 @@ public class TransactionManager // Transaction has done the abort on all the transactional elements. if ( ! activeTransactions.contains(transaction) ) SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ; + endTransaction(transaction) ; + + switch ( transaction.getMode() ) + { + case READ: + endOfRead(transaction) ; + break ; + case WRITE: + // Journal cleaned in Transaction.abort. + abortedWrite ++ ; + } + } + + /** READ specific final actions. */ + private void endOfRead(Transaction transaction) + { + processDelayedReplyQueue(transaction) ; + finishedReads ++ ; } + private void processDelayedReplyQueue(Transaction txn) + { + if ( activeReaders != 0 || activeWriters != 0 ) + { + if ( queue.size() > 0 ) + if ( log() ) log(format("Pending transactions: R=%d / W=%d", activeReaders, activeWriters), txn) ; + return ; + } + while ( queue.size() > 0 ) + { + try { + Transaction txn2 = queue.take() ; + + if ( txn2.getMode() == READ ) + continue ; + log("Flush delayed commit", txn2) ; + // This takes a Write lock on the DSG - this is where it blocks. + JournalControl.replay(txn2) ; + commitedAwaitingFlush.remove(txn2) ; + } catch (InterruptedException ex) + { Log.fatal(this, "Interruped!", ex) ; } + } + } + synchronized public void notifyClose(Transaction txn) { @@ -192,40 +255,16 @@ public class TransactionManager String x = txn.getBaseDataset().getLocation().getDirectoryPath() ; syslog.warn("close: Transaction not commited or aborted: Transaction: "+txn.getTxnId()+" @ "+x) ; txn.abort() ; - } - - // Process any pending commits held up due to a reader. - if ( readers == 0 && writers == 0 ) - { - // Given this is sync'ed to this TransactionManager, - // the query never blocks, nor does it need to be concurrent-safe. - // later ... - while ( queue.size() > 0 ) - { - try { - Transaction txn2 = queue.take() ; - if ( txn2.getMode() == READ ) - continue ; - log.info("Delayed commit", txn2) ; - // This takes a Write lock on the DSG - this is where it blocks. - JournalControl.replay(txn2) ; - commitedAwaitingFlush.remove(txn) ; - } catch (InterruptedException ex) - { Log.fatal(this, "Interruped!", ex) ; } - } - } - else - { - if ( log() ) log(format("Pending transactions: R=%d / W=%d", readers, writers), txn) ; + return ; } } - + private void endTransaction(Transaction transaction) { if ( transaction.getMode() == READ ) - readers-- ; + activeReaders-- ; else - writers-- ; + activeWriters-- ; activeTransactions.remove(transaction) ; } @@ -249,6 +288,10 @@ public class TransactionManager log.debug(txn.getLabel()+": "+msg) ; } + synchronized + public State state() + { return new State(this) ; } + // LATER. class Committer implements Runnable { Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java?rev=1151627&r1=1151626&r2=1151627&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/AbstractTestTransSeq.java Wed Jul 27 21:03:03 2011 @@ -19,9 +19,12 @@ package com.hp.hpl.jena.tdb.transaction; +import java.util.Iterator ; + import org.junit.Test ; import org.openjena.atlas.junit.BaseTest ; +import com.hp.hpl.jena.graph.Node ; import com.hp.hpl.jena.sparql.core.Quad ; import com.hp.hpl.jena.sparql.sse.SSE ; import com.hp.hpl.jena.tdb.DatasetGraphTxn ; @@ -98,10 +101,34 @@ public abstract class AbstractTestTransS @Test public void trans_05() { - // READ(block)-WRITE-commit-WRITE-abort-WRITE-commit + // READ before WRITE remains seeing old view - READ before WRITE starts + StoreConnection sConn = getStoreConnection() ; + DatasetGraphTxn dsgR1 = sConn.begin(ReadWrite.READ) ; + DatasetGraphTxn dsgW = sConn.begin(ReadWrite.WRITE) ; + + dsgW.add(q) ; + dsgW.commit() ; + dsgW.close() ; + + assertFalse(dsgR1.contains(q)) ; + dsgR1.close() ; + + DatasetGraphTxn dsgR2 = sConn.begin(ReadWrite.READ) ; + assertTrue(dsgR2.contains(q)) ; + dsgR2.close() ; + } + + @Test public void trans_06() + { + // READ(block)-WRITE-commit-WRITE-abort-WRITE-commit-READ(close)-check StoreConnection sConn = getStoreConnection() ; DatasetGraphTxn dsgR1 = sConn.begin(ReadWrite.READ) ; + // IF + // dsgR1.close() ; + // THEN it works. + // ==> deplay replay + DatasetGraphTxn dsgW1 = sConn.begin(ReadWrite.WRITE) ; dsgW1.add(q1) ; dsgW1.commit() ; @@ -123,27 +150,13 @@ public abstract class AbstractTestTransS dsgR1.close() ; DatasetGraphTxn dsgR2 = sConn.begin(ReadWrite.READ) ; + assertTrue(dsgR2.contains(q1)) ; assertFalse(dsgR2.contains(q2)) ; assertTrue(dsgR2.contains(q3)) ; dsgR2.close() ; } - @Test public void trans_06() - { - // READ before WRITE remains seeing old view - READ before WRITE starts - StoreConnection sConn = getStoreConnection() ; - DatasetGraphTxn dsgR = sConn.begin(ReadWrite.READ) ; - DatasetGraphTxn dsgW = sConn.begin(ReadWrite.WRITE) ; - - dsgW.add(q) ; - dsgW.commit() ; - dsgW.close() ; - - assertFalse(dsgR.contains(q)) ; - dsgR.close() ; - } - @Test public void trans_07() { // READ before WRITE remains seeing old view - READ after WRITE starts Modified: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java?rev=1151627&r1=1151626&r2=1151627&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystem.java Wed Jul 27 21:03:03 2011 @@ -19,6 +19,7 @@ package com.hp.hpl.jena.tdb.transaction; import static com.hp.hpl.jena.tdb.transaction.TransTestLib.count ; +import static java.lang.String.format ; import java.util.concurrent.Callable ; import java.util.concurrent.ExecutorService ; @@ -30,6 +31,10 @@ import org.junit.AfterClass ; import org.junit.BeforeClass ; import org.openjena.atlas.lib.FileOps ; import org.openjena.atlas.lib.Lib ; +import org.openjena.atlas.lib.RandomLib ; +import org.openjena.atlas.logging.Log ; +import org.slf4j.Logger ; +import org.slf4j.LoggerFactory ; import com.hp.hpl.jena.datatypes.xsd.XSDDatatype ; import com.hp.hpl.jena.graph.Node ; @@ -43,23 +48,47 @@ import com.hp.hpl.jena.tdb.base.file.Loc /** System testing of the transactions. */ public class TestTransSystem { + static { Log.setLog4j() ; } + private static Logger log = LoggerFactory.getLogger(TestTransSystem.class) ; + static final boolean progress = ! log.isInfoEnabled() ; + + + static final int Iterations = 1 ; + static final int readerSeqRepeats = 10 ; + static final int readerMaxPause = 50 ; + static final int writerAbortSeqRepeats = 1 ; + static final int writerCommitSeqRepeats = 5 ; + static final int writerMaxPause = 10 ; + public static void main(String...args) { - final int N = 100 ; + + final int N = (Iterations < 10) ? 1 : Iterations / 10 ; int i ; - for ( i = 0 ; i < 1000 ; i++ ) + for ( i = 0 ; i < Iterations ; i++ ) { if ( i%N == 0 ) - System.out.printf("%03d: ",i) ; - System.out.print(".") ; + printf("%03d: ",i) ; + printf(".") ; if ( i%N == (N-1) ) - System.out.println() ; + println() ; new TestTransSystem().manyReaderAndOneWriter() ; } if ( i%N != 0 ) System.out.println() ; - System.out.println() ; - System.out.printf("DONE (%03d)\n",i) ; + println() ; + printf("DONE (%03d)\n",i) ; + } + + private static void println() + { + printf("\n") ; + } + + private static void printf(String string, Object...args) + { + if ( progress ) + System.out.printf(string, args) ; } private ExecutorService execService = Executors.newCachedThreadPool() ; @@ -124,15 +153,15 @@ public class TestTransSystem final int numOfTasks = 10 ; final StoreConnection sConn = getStoreConnection() ; - Callable procR = new Reader(sConn, 10, 50) ; // Number of repeats, max pause - Callable procW_a = new Writer(sConn, 1, 10, false) // Number of repeats, max pause, commit. + Callable procR = new Reader(sConn, readerSeqRepeats, readerMaxPause) ; // Number of repeats, max pause + Callable procW_a = new Writer(sConn, writerAbortSeqRepeats, writerMaxPause, false) // Number of repeats, max pause, commit. { @Override protected int change(DatasetGraphTxn dsg, int id, int i) { return changeProc(dsg, id, i) ; } } ; - Callable procW_c = new Writer(sConn, 5, 10, true) // Number of repeats, max pause, commit. + Callable procW_c = new Writer(sConn, writerCommitSeqRepeats, writerMaxPause, true) // Number of repeats, max pause, commit. { @Override protected int change(DatasetGraphTxn dsg, int id, int i) @@ -158,16 +187,18 @@ public class TestTransSystem static int changeProc(DatasetGraphTxn dsg, int id, int i) { int count = 0 ; - int N = 5 ; + int maxN = 500 ; + int N = RandomLib.qrandom.nextInt(maxN) ; for ( int j = 0 ; j < N; j++ ) { - Quad q = genQuad(id+j) ; + Quad q = genQuad(id*maxN+j) ; if ( ! dsg.contains(q) ) { dsg.add(q) ; count++ ; } } + log.debug("Change = "+dsg.getDefaultGraph().size()) ; return count ; } @@ -191,11 +222,14 @@ public class TestTransSystem for ( int i = 0 ; i < repeats; i++ ) { DatasetGraphTxn dsg = sConn.begin(ReadWrite.READ) ; + log.debug("reader start "+id+"/"+i) ; + int x1 = count("SELECT * { ?s ?p ?o }", dsg) ; pause(maxpause) ; int x2 = count("SELECT * { ?s ?p ?o }", dsg) ; if ( x1 != x2 ) - System.err.printf("Change seen: id=%d: i=%d\n", id, i) ; + log.warn(format("Change seen: id=%d: i=%d\n", id, i)) ; + log.debug("reader finish "+id+"/"+i) ; dsg.close() ; } return null ; @@ -224,6 +258,8 @@ public class TestTransSystem for ( int i = 0 ; i < repeats ; i++ ) { DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ; +System.err.println("writer "+id+"/"+i) ; + int x1 = count("SELECT * { ?s ?p ?o }", dsg) ; int z = change(dsg, id, i) ; pause(maxpause) ;