Return-Path: X-Original-To: apmail-incubator-jena-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-jena-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6D1846C11 for ; Tue, 12 Jul 2011 16:17:01 +0000 (UTC) Received: (qmail 85772 invoked by uid 500); 12 Jul 2011 16:17:01 -0000 Delivered-To: apmail-incubator-jena-commits-archive@incubator.apache.org Received: (qmail 85744 invoked by uid 500); 12 Jul 2011 16:17:01 -0000 Mailing-List: contact jena-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jena-dev@incubator.apache.org Delivered-To: mailing list jena-commits@incubator.apache.org Received: (qmail 85429 invoked by uid 99); 12 Jul 2011 16:17:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Jul 2011 16:17:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Jul 2011 16:16:59 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id ECECF2388980; Tue, 12 Jul 2011 16:16:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1145650 - in /incubator/jena/Experimental/TxTDB/trunk: src-dev/tx/ src/main/java/com/hp/hpl/jena/tdb/ src/main/java/com/hp/hpl/jena/tdb/transaction/ Date: Tue, 12 Jul 2011 16:16:38 -0000 To: jena-commits@incubator.apache.org From: andy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110712161638.ECECF2388980@eris.apache.org> Author: andy Date: Tue Jul 12 16:16:38 2011 New Revision: 1145650 URL: http://svn.apache.org/viewvc?rev=1145650&view=rev Log: (empty) Added: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TxnState.java (with props) Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/DatasetGraphTxn.java incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java?rev=1145650&r1=1145649&r2=1145650&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/DevTx.java Tue Jul 12 16:16:38 2011 @@ -3,13 +3,17 @@ package tx; public class DevTx { + // -------- // To do for release 0: + // Journal left full when replayed. + // Txn over block/commited transaction + // -------- + // Tests - // Write back of commited transaction when outstanding readers. // Transaction over block/commited transaction // Tidy up: - // A DatasetGraphTDB is 3 NodeTupelTables. Build as such. + // A DatasetGraphTDB is 3 NodeTupleTables. Build as such. // DatasetBuilderStd.makeNodeTupleTable. // TripleTable, QuadTable, DatasetPrefixesTDB are just function-adding wrappers. // Then readonly NodeTupleTable. Remove NodeTableBuilderReadonly @@ -43,8 +47,6 @@ public class DevTx // Enable txn mode and then must use transactions. // or require queries explicitly closed. - - // Evenetually: // Iterator> find(Tuple tuple) Modified: incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java?rev=1145650&r1=1145649&r2=1145650&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src-dev/tx/TxMain.java Tue Jul 12 16:16:38 2011 @@ -8,6 +8,7 @@ package tx; import org.openjena.atlas.lib.Bytes ; import org.openjena.atlas.lib.FileOps ; +import org.openjena.atlas.lib.Lib ; import org.openjena.atlas.logging.Log ; import com.hp.hpl.jena.graph.Graph ; @@ -67,11 +68,13 @@ public class TxMain public static void main(String... args) { - initFS() ; + //initFS() ; + //** Leaving journals behind after replay. StoreConnection sConn = StoreConnection.make(DBdir) ; // Take a blocking read connection. DatasetGraphTxn dsgRead = sConn.begin(ReadWrite.READ) ; + dsgRead.close() ; DatasetGraphTxn dsg = sConn.begin(ReadWrite.WRITE) ; load("D.ttl", dsg) ; @@ -81,12 +84,13 @@ public class TxMain // Cheap: lock on begin/READ. // dsg.commit() ; - - dsg = sConn.begin(ReadWrite.READ) ; - query("DSG1", "SELECT (count(*) AS ?C) { ?s ?p ?o }", dsg) ; - dsg.close() ; + dsg.close() ; // Hmm +// dsg = sConn.begin(ReadWrite.READ) ; +// query("DSG1", "SELECT (count(*) AS ?C) { ?s ?p ?o }", dsg) ; +// dsg.close() ; - dsgRead.close() ; // Transaction can now write chnages to the real DB. + dsgRead.close() ; // Transaction can now write changes to the real DB. + exit(0) ; } private static void write(Graph graph, String lang) Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/DatasetGraphTxn.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/DatasetGraphTxn.java?rev=1145650&r1=1145649&r2=1145650&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/DatasetGraphTxn.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/DatasetGraphTxn.java Tue Jul 12 16:16:38 2011 @@ -6,9 +6,7 @@ package com.hp.hpl.jena.tdb; - import com.hp.hpl.jena.tdb.store.DatasetGraphTDB ; -import com.hp.hpl.jena.tdb.sys.SystemTDB ; import com.hp.hpl.jena.tdb.transaction.Transaction ; public class DatasetGraphTxn extends DatasetGraphTDB @@ -23,13 +21,11 @@ public class DatasetGraphTxn extends Dat public Transaction getTransaction() { return transaction ; } - synchronized public void commit() { transaction.commit() ; } - synchronized public void abort() { transaction.abort() ; @@ -43,11 +39,9 @@ public class DatasetGraphTxn extends Dat synchronized public void close() { - if ( transaction.getMode() == ReadWrite.WRITE ) - { - SystemTDB.syslog.warn("close: Transaction not commited or aborted: Transaction: "+transaction.getTxnId()+" @ "+getLocation().getDirectoryPath()) ; - abort() ; - } + if ( transaction != null ) + transaction.close() ; + //transaction = null ; //Don't really close. Might close the core resources which are shared. //super.close() ; } Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java?rev=1145650&r1=1145649&r2=1145650&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/StoreConnection.java Tue Jul 12 16:16:38 2011 @@ -25,6 +25,7 @@ import com.hp.hpl.jena.tdb.base.file.Loc import com.hp.hpl.jena.tdb.setup.DatasetBuilderStd ; import com.hp.hpl.jena.tdb.store.DatasetGraphTDB ; import com.hp.hpl.jena.tdb.sys.TDBMaker ; +import com.hp.hpl.jena.tdb.transaction.JournalControl ; import com.hp.hpl.jena.tdb.transaction.TransactionManager ; /** Interface to the TDB transaction mechanism */ @@ -62,6 +63,7 @@ public class StoreConnection if ( sConn == null ) { sConn = new StoreConnection(location) ; + JournalControl.recovery(sConn.baseDSG) ; cache.put(location, sConn) ; } return sConn ; Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java?rev=1145650&r1=1145649&r2=1145650&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/JournalControl.java Tue Jul 12 16:16:38 2011 @@ -139,8 +139,8 @@ public class JournalControl journal.position(0) ; dsg.getLock().enterCriticalSection(Lock.WRITE) ; try { - for ( JournalEntry e : journal ) - replay(e, dsg) ; + for ( JournalEntry e : journal ) + replay(e, dsg) ; } catch (RuntimeException ex) { Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java?rev=1145650&r1=1145649&r2=1145650&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/Transaction.java Tue Jul 12 16:16:38 2011 @@ -15,6 +15,7 @@ import java.util.List ; import com.hp.hpl.jena.tdb.ReadWrite ; import com.hp.hpl.jena.tdb.store.DatasetGraphTDB ; import com.hp.hpl.jena.tdb.sys.FileRef ; +import com.hp.hpl.jena.tdb.sys.SystemTDB ; /** A transaction handle */ public class Transaction @@ -24,8 +25,7 @@ public class Transaction private final TransactionManager txnMgr ; private final List> iterators ; private Journal journal = null ; - private enum State { ACTIVE, PREPARING, COMMITED, ABORTED } - private State state ; + private TxnState state ; private ReadWrite mode ; private final List nodeTableTrans = new ArrayList() ; @@ -39,28 +39,25 @@ public class Transaction this.basedsg = basedsg ; this.mode = mode ; this.iterators = new ArrayList>() ; - state = State.ACTIVE ; + state = TxnState.ACTIVE ; } + synchronized public void commit() { - if ( mode == ReadWrite.READ ) + if ( mode == ReadWrite.WRITE ) { - state = State.COMMITED ; - return ; + if ( state != TxnState.ACTIVE ) + throw new TDBTransactionException("Transaction has already committed or aborted") ; + + prepare() ; + + JournalEntry entry = new JournalEntry(JournalEntryType.Commit, FileRef.Journal, null) ; + journal.writeJournal(entry) ; + journal.sync() ; // Commit point. } - - if ( state != State.ACTIVE ) - throw new TDBTransactionException("Transaction has already committed or aborted") ; - - prepare() ; - - JournalEntry entry = new JournalEntry(JournalEntryType.Commit, FileRef.Journal, null) ; - journal.writeJournal(entry) ; - journal.sync() ; // Commit point. - // Attempt to play the journal into the dataset. - // This is idempotent and safe to partial replay. - state = State.COMMITED ; + + state = TxnState.COMMITED ; txnMgr.notifyCommit(this) ; } @@ -69,9 +66,9 @@ public class Transaction if ( mode == ReadWrite.READ ) return ; - if ( state != State.ACTIVE ) + if ( state != TxnState.ACTIVE ) throw new TDBTransactionException("Transaction has already committed or aborted") ; - state = State.PREPARING ; + state = TxnState.PREPARING ; for ( BlockMgrJournal x : blkMgrs ) x.commit(this) ; @@ -85,15 +82,16 @@ public class Transaction prepare() ; } + synchronized public void abort() { if ( mode == ReadWrite.READ ) { - state = State.ABORTED ; + state = TxnState.ABORTED ; return ; } - if ( state != State.ACTIVE ) + if ( state != TxnState.ACTIVE ) throw new TDBTransactionException("Transaction has already committed or aborted") ; journal.truncate(0) ; @@ -104,11 +102,37 @@ public class Transaction for ( NodeTableTrans x : nodeTableTrans ) x.abort(this) ; - state = State.ABORTED ; + state = TxnState.ABORTED ; + txnMgr.notifyAbort(this) ; + } + + /** transaction close happens after commit/abort + * read transactions "auto commit" on close(). + * write transactions must call abort or commit. + */ + synchronized + public void close() + { + switch(state) + { + case ACTIVE: + if ( mode == ReadWrite.READ ) + commit() ; + else + { + SystemTDB.errlog.warn("Transaction not commited or aborted") ; + abort() ; + } + break ; + default: + } + state = TxnState.CLOSED ; + txnMgr.notifyClose(this) ; } public ReadWrite getMode() { return mode ; } + public TxnState getState() { return state ; } public long getTxnId() { return id ; } public TransactionManager getTxnMgr() { return txnMgr ; } @@ -146,6 +170,12 @@ public class Transaction { return basedsg ; } + + @Override + public String toString() + { + return "Transaction: "+id+" : Mode="+mode+" : State="+state+" : "+basedsg.getLocation().getDirectoryPath() ; + } } /* Modified: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java?rev=1145650&r1=1145649&r2=1145650&view=diff ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java (original) +++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TransactionManager.java Tue Jul 12 16:16:38 2011 @@ -7,12 +7,18 @@ package com.hp.hpl.jena.tdb.transaction; import static com.hp.hpl.jena.tdb.ReadWrite.READ ; +import static java.lang.String.format ; import java.util.ArrayList ; import java.util.HashSet ; import java.util.Iterator ; import java.util.List ; import java.util.Set ; +import java.util.concurrent.BlockingQueue ; +import java.util.concurrent.LinkedBlockingDeque ; + +import org.openjena.atlas.logging.Log ; +import org.slf4j.Logger ; import com.hp.hpl.jena.sparql.core.DatasetGraph ; import com.hp.hpl.jena.tdb.DatasetGraphTxn ; @@ -23,6 +29,10 @@ import com.hp.hpl.jena.tdb.sys.SystemTDB public class TransactionManager { + // TODO Don't keep counter, keep lists. + + private static Logger log = SystemTDB.syslog ; + private Set activeTransactions = new HashSet() ; // Transactions that have commit (and the journal is written) but haven't @@ -32,21 +42,20 @@ public class TransactionManager private int readers = 0 ; private int writers = 0 ; // 0 or 1 - private int committed = 0 ; + private int committed = 0 ; + + private BlockingQueue queue = new LinkedBlockingDeque() ; + + private Thread committerThread ; -// public static DataSource begin(Dataset ds) -// { -// return null ; -// } -// -// public static DataSource commit(Dataset ds) -// { -// return null ; -// } - public TransactionManager() - { + { + // LATER +// Committer c = new Committer() ; +// this.committerThread = new Thread(c) ; +// committerThread.setDaemon(true) ; +// committerThread.start() ; } private Transaction createTransaction(DatasetGraphTDB dsg, ReadWrite mode) @@ -58,6 +67,9 @@ public class TransactionManager synchronized public DatasetGraphTxn begin(DatasetGraph dsg, ReadWrite mode) { + if ( log.isDebugEnabled() ) + log.debug(format("begin: R={} / W={} / #={}", readers, writers,activeTransactions.size())) ; + // If already a transaction ... // Subs transactions are a new view - commit is only commit to parent transaction. if ( dsg instanceof DatasetGraphTxn ) @@ -77,6 +89,8 @@ public class TransactionManager case WRITE : if ( writers > 0 ) throw new TDBTransactionException("Existing active transaction") ; + writers ++ ; + break ; } DatasetGraphTDB dsgtdb = (DatasetGraphTDB)dsg ; @@ -85,39 +99,133 @@ public class TransactionManager dsgtdb.sync() ; Transaction txn = createTransaction(dsgtdb, mode) ; + DatasetGraphTxn dsgTxn = (DatasetGraphTxn)new DatasetBuilderTxn(this).build(txn, mode, dsgtdb) ; Iterator iter = dsgTxn.getTransaction().components() ; for ( ; iter.hasNext() ; ) iter.next().begin(dsgTxn.getTransaction()) ; - + activeTransactions.add(txn) ; + if ( log.isDebugEnabled() ) + log.debug("begin: "+txn) ; return dsgTxn ; } + synchronized public void notifyCommit(Transaction transaction) { + if ( log.isDebugEnabled() ) + log.debug("commit: "+transaction) ; + if ( ! activeTransactions.contains(transaction) ) SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ; endTransaction(transaction) ; - // [TxTDB:TODO] CAN WE DO THIS? - JournalControl.replay(transaction.getJournal(), transaction.getBaseDataset()) ; + if ( readers == 0 && transaction.getMode() == ReadWrite.WRITE ) + // New readers blocked from starting by the synchronized here and on begin. + JournalControl.replay(transaction.getJournal(), transaction.getBaseDataset()) ; + else + { + commitedAwaitingFlush.add(transaction) ; + if ( log.isDebugEnabled() ) + log.info("Commit blocked at the moment") ; + queue.add(transaction) ; + } } + synchronized public void notifyAbort(Transaction transaction) { + if ( log.isDebugEnabled() ) + log.info("notifyAbort: "+transaction) ; if ( ! activeTransactions.contains(transaction) ) SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ; endTransaction(transaction) ; } + synchronized + public void notifyClose(Transaction txn) + { + if ( log.isDebugEnabled() ) + log.debug("notifyClose: "+txn) ; + + if ( txn.getState() == TxnState.ACTIVE ) + { + String x = txn.getBaseDataset().getLocation().getDirectoryPath() ; + SystemTDB.syslog.warn("close: Transaction not commited or aborted: Transaction: "+txn.getTxnId()+" @ "+x) ; + txn.abort() ; + } + + if ( readers == 0 && writers == 0 ) + { + // Given this is sync'ed to the TransactionManager, + // the query never blocks, nor does it need to be concurrent-safe. + // later ... + while ( queue.size() > 0 ) + { + try { + Transaction txn2 = queue.take() ; + if ( txn2.getMode() == READ ) + continue ; + log.info("Delayed commit") ; + // This takes a Write lock on the DSG - this is where it blocks. + JournalControl.replay(txn2.getJournal(), txn2.getBaseDataset()) ; + log.info("Delayed commit succeeded") ; + commitedAwaitingFlush.remove(txn) ; + } catch (InterruptedException ex) + { Log.fatal(this, "Interruped!", ex) ; } + } + } + else + { + if ( log.isDebugEnabled() ) + log.debug(format("Pending transactions: R=%d / W=%d", readers, writers)) ; + } + } + private void endTransaction(Transaction transaction) { + if ( log.isDebugEnabled() ) + log.debug("endTransaction: "+transaction) ; + if ( transaction.getMode() == READ ) readers-- ; else writers-- ; activeTransactions.remove(transaction) ; + + if ( log.isDebugEnabled() ) + log.debug(format("endTransaction: R=%d / W=%d / #=%d\n", readers, writers,activeTransactions.size())) ; + + } + + // LATER. + class Committer implements Runnable + { + @Override + public void run() + { + for(;;) + { + // Wait until the reader count goes to zero. + + // This wakes up for every transation but maybe + // able to play several transactions at once (later). + try { + Transaction txn = queue.take() ; + System.out.println("Async commit") ; + // This takes a Write lock on the DSG - this is where it blocks. + JournalControl.replay(txn.getJournal(), txn.getBaseDataset()) ; + System.out.println("Async commit succeeded") ; + synchronized(TransactionManager.this) + { + commitedAwaitingFlush.remove(txn) ; + } + } catch (InterruptedException ex) + { Log.fatal(this, "Interruped!", ex) ; } + } + } + } } Added: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TxnState.java URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TxnState.java?rev=1145650&view=auto ============================================================================== --- incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TxnState.java (added) +++ incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TxnState.java Tue Jul 12 16:16:38 2011 @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.hp.hpl.jena.tdb.transaction; + +public enum TxnState { ACTIVE, PREPARING, COMMITED, ABORTED, CLOSED } Propchange: incubator/jena/Experimental/TxTDB/trunk/src/main/java/com/hp/hpl/jena/tdb/transaction/TxnState.java ------------------------------------------------------------------------------ svn:mime-type = text/plain