Return-Path: Delivered-To: apmail-incubator-river-commits-archive@locus.apache.org Received: (qmail 6219 invoked from network); 10 Dec 2008 05:15:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Dec 2008 05:15:12 -0000 Received: (qmail 5504 invoked by uid 500); 10 Dec 2008 05:15:24 -0000 Delivered-To: apmail-incubator-river-commits-archive@incubator.apache.org Received: (qmail 5441 invoked by uid 500); 10 Dec 2008 05:15:24 -0000 Mailing-List: contact river-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: river-dev@incubator.apache.org Delivered-To: mailing list river-commits@incubator.apache.org Received: (qmail 5432 invoked by uid 99); 10 Dec 2008 05:15:24 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Dec 2008 21:15:24 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Dec 2008 05:13:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BF6D02388A61; Tue, 9 Dec 2008 21:14:00 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r724979 [10/14] - in /incubator/river/jtsk/skunk/niclas1/services/outrigger: ./ src/ src/main/ src/main/java/ src/main/java/com/ src/main/java/com/sun/ src/main/java/com/sun/jini/ src/main/java/com/sun/jini/outrigger/ src/main/java/com/sun/... Date: Wed, 10 Dec 2008 05:13:55 -0000 To: river-commits@incubator.apache.org From: niclas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081210051409.BF6D02388A61@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/Txn.java URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/Txn.java?rev=724979&view=auto ============================================================================== --- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/Txn.java (added) +++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/Txn.java Tue Dec 9 21:13:53 2008 @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.sun.jini.outrigger; + +import com.sun.jini.constants.TxnConstants; +import com.sun.jini.logging.Levels; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import net.jini.core.transaction.CannotJoinException; +import net.jini.core.transaction.server.ServerTransaction; +import net.jini.core.transaction.server.TransactionConstants; +import net.jini.core.transaction.server.TransactionManager; +import net.jini.security.ProxyPreparer; +import net.jini.space.InternalSpaceException; + +/** + * This class represents a space's state in a single transaction. + * + * Object of this class represent Jini transactions within outrigger. + * These transactions hold "Transactables" -- things that represent + * the actions that have been taken under this transaction. For example, + * if this transaction were to be cancelled, the Transactables are + * examined and serve as the list of things to roll back in order to + * restore the state of the Space status quo ante. + * + * This is achieved by having the transactables notified of state changes + * to this transaction such as preparing, commit, etc. The + * transactables themselves are responsible for "doing the right thing." + * + * NB: some--but not all--of the actions one can take with a transaction + * are managed internally by these objects. That is, all of the important + * methods objects of these types are synchronized. Therefore, two + * simultaneous calls to abort() and commit() are arbitrated properly. + * + * However, care must be taken when add()ing a transactable. Even + * though the add() method is synchronized if you check the state of + * the transaction to ensure that is active and then call add() the + * transaction could have left the active state between the check and + * the add() call unless the call obtains the appropriate locks. This + * is even more likely if other work needs to be done in addition to + * calling add() (e.g. persisting state, obtaining locks, etc.). The + * caller of add() should lock the associated transaction object and + * ensure that the transaction is still considered ACTIVE, do whatever + * work is necessary to complete while the transaction is in the ACTIVE + * state (including calling call add()) and then release the lock. + * This can be done by : + *
    + *
  • holding the lock on this object while checking the + * state and carrying out the operation (including calling add()), or + *
  • calling ensureActive() to check the state + * and obtain a non-exclusive lock, carrying out the operation + * (including calling add()), and then calling allowStateChange() to + * release the lock. + *
+ * The pair of ensureActive() and allowStateChange() allows for more + * concurrency if the operation is expected to take a long time, in + * that it will allow for other operations to be performed under the + * same transaction and let aborts prevent other operations from + * being started. + * + * @author Sun Microsystems, Inc. + */ +class Txn implements TransactableMgr, TransactionConstants, StorableObject +{ + + /** + * The internal id Outrigger as assigned to the transaction + */ + final private long id; + + /** + * What state we think the transaction is in + */ + private int state; + + /** + * The transaction manager associated with the transaction + * this object is fronting for. + */ + private StorableReference trm; + + /** + * Cached ServerTransaction object for + * the transaction this object is fronting for. + */ + private ServerTransaction tr; + + /** + * The id the transaction manager assigned to this transaction + */ + private long trId; + + /** + * The list of Transactable participating in + * this transaction. + */ + final private List txnables = new java.util.LinkedList(); + + /** + * The task responsible for monitoring to see if this + * transaction has been aborted with us being told, or + * null if no such task as been allocated. + */ + private TxnMonitorTask monitorTask; + + /** + * Count of number of threads holding a read lock on state + */ + private int stateReaders = 0; + + /** + * true if there is a blocked state change. Used + * to give writers priority. + */ + private boolean stateChangeWaiting = false; + + /** + * Logger for logging transaction related information + */ + private static final Logger logger = + Logger.getLogger( OutriggerServerImpl.txnLoggerName ); + + /** + * Create a new Txn that represents our state in the + * given ServerTransaction. + */ + Txn( ServerTransaction tr, long id ) + { + this( id ); + trId = tr.id; + this.tr = tr; + this.trm = new StorableReference( tr.mgr ); + state = ACTIVE; + + if( logger.isLoggable( Level.FINER ) ) + { + logger.log( Level.FINER, "creating txn for transaction mgr:" + + "{0}, id:{1}, state:{2}", + new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } ); + } + } + + /** + * Used in recovery + */ + Txn( long id ) + { + this.id = id; // the txn id is not persisted + } + + /** + * Get the id for this txn. Note that this id is NOT the same as + * the ID of the transaction. Since that ID is not unique (it must + * be qualified with the ServerTransaction object) we create + * our own internal id to make txns unique. This is needed since we + * may not have the Transaction unmarshalled. + */ + Long getId() + { + return new Long( id ); + } + + /** + * We keep the transaction ID around because we may need it + * to identify a broken transaction after recovery. + */ + long getTransactionId() + { + return trId; + } + + /** + * Return our local view of the current state. Need to be holding + * the lock on this object or have called ensureActive + * to get the current value. + */ + int getState() + { + return state; + } + + /** + * Atomically checks that this transaction is in the active + * state and locks the transaction in the active state. + * The lock can be released by calling allowStateChange. + * Each call to this method should be paired with a call to + * allowStateChange in a finally block. + * + * @throws CannotJoinException if the transaction + * is not active or a state change is pending. + */ + synchronized void ensureActive() throws CannotJoinException + { + if( state != ACTIVE || stateChangeWaiting ) + { + final String msg = "transaction mgr:" + tr + ", id:" + trId + + " not active, in state " + TxnConstants.getName( state ); + final CannotJoinException e = new CannotJoinException( msg ); + logger.log( Levels.FAILED, msg, e ); + throw e; + } + assert stateReaders >= 0; + stateReaders++; + } + + /** + * Release the read lock created by an ensureActive + * call. Does nothing if the transaction is not active or there is + * a state change pending and thus is safe to call even if the + * corresponding ensureActive call threw + * CannotJoinException. + */ + synchronized void allowStateChange() + { + if( state != ACTIVE || stateChangeWaiting ) + { + return; + } + stateReaders--; + assert stateReaders >= 0; + notifyAll(); + } + + /** + * Prevents new operations from being started under this + * transaction and blocks until in process operations are + * completed. + */ + synchronized void makeInactive() + { + stateChangeWaiting = true; + assert stateReaders >= 0; + while( stateReaders != 0 ) + { + try + { + wait(); + } + catch( InterruptedException e ) + { + throw new AssertionError( e ); + } + assert stateReaders >= 0; + } + } + + /** + * Prepare for transaction commit. makeInactive must have + * been called on this transaction first. + */ + synchronized int prepare( OutriggerServerImpl space ) + { + assert stateChangeWaiting : "prepare called before makeInactive"; + assert stateReaders == 0 : "prepare called before makeInactive completed"; + + if( logger.isLoggable( Level.FINER ) ) + { + logger.log( Level.FINER, "prepare: transaction mgr:{0}, id:{1}, " + + "state:{2}", + new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } ); + } + + switch( state ) + { + case ABORTED: // previously aborted + return ABORTED; + + case COMMITTED: // previously committed + throw new IllegalStateException(); // "cannot happen" + + case NOTCHANGED: // previously voted NOTCHANGED + case PREPARED: // previously voted PREPARED + return state; // they are idempotent, and + // and we have nothing to do + // so return + + case ACTIVE: // currently active + boolean changed = false; // did this txn change + // anything? + + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "prepare:preparing transaction mgr:" + + "{0}, id:{1}, state:{2}", + new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } ); + } + + // loop through Transactable members of this Txn + final Iterator i = txnables.iterator(); + int c = 0; // Counter for debugging message + while( i.hasNext() ) + { + // get this member's vote + final Transactable transactable = (Transactable) i.next(); + final int prepState = transactable.prepare( this, space ); + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "prepare:prepared " + + "transactable {0} for transaction mgr:{1}, id:{2}," + + " transactable now in state {3}", + new Object[]{ transactable, tr, new Long( trId ), + TxnConstants.getName( prepState ) } ); + } + + switch( prepState ) + { + case PREPARED: // has prepared state + changed = true; // this means a change + continue; + + case ABORTED: // has to abort + abort( space ); // abort this txn (does cleanup) + state = ABORTED; + return state; // vote aborted + + case NOTCHANGED: // no change + i.remove(); // Won't need to call again + continue; + + default: // huh? + throw new + InternalSpaceException( "prepare said " + prepState ); + } + } + + if( changed ) + { + state = PREPARED; + // have to watch this since it's now holding permanent + // resources + space.monitor( Collections.nCopies( 1, this ) ); + } + else + { + state = NOTCHANGED; + } + break; + + default: + throw new IllegalStateException( "unknown Txn state: " + state ); + } + + return state; + } + + /** + * Abort the transaction. This must be callable from + * prepare because if a Transactable + * votes ABORT, this method is called to make that + * happen. makeInactive must have been called on this + * transaction first. + */ + synchronized void abort( OutriggerServerImpl space ) + { + assert stateChangeWaiting : "abort called before makeInactive"; + assert stateReaders == 0 : "abort called before makeInactive completed"; + + if( logger.isLoggable( Level.FINER ) ) + { + logger.log( Level.FINER, "abort: transaction mgr:{0}, id:{1}, " + + "state:{2}", + new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } ); + } + + switch( state ) + { + case ABORTED: // already aborted + case NOTCHANGED: // nothing to abort + break; + + case COMMITTED: // "cannot happen" + throw new IllegalStateException( "aborting a committed txn" ); + + case ACTIVE: + case PREPARED: + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "abort:aborting transaction mgr:" + + "{0}, id:{1}, state:{2}", + new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } ); + } + + final Iterator i = txnables.iterator(); + while( i.hasNext() ) + { + ( (Transactable) i.next() ).abort( this, space ); + } + state = ABORTED; + cleanup(); + break; + + default: + throw new IllegalStateException( "unknown Txn state: " + state ); + } + } + + /** + * Having prepared, roll the changes + * forward. makeInactive must have been called on + * this transaction first. + */ + synchronized void commit( OutriggerServerImpl space ) + { + assert stateChangeWaiting : "commit called before makeInactive"; + assert stateReaders == 0 : "commit called before makeInactive completed"; + + //!! Need to involve mgr here + if( logger.isLoggable( Level.FINER ) ) + { + logger.log( Level.FINER, "commit: transaction mgr:{0}, id:{1}, " + + "state:{2}", + new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } ); + } + + switch( state ) + { + case ABORTED: // "cannot happen" stuff + case ACTIVE: + case NOTCHANGED: + throw new IllegalStateException( "committing " + + TxnConstants.getName( state ) + " txn" ); + + case COMMITTED: // previous committed, that's okay + return; + + case PREPARED: // voted PREPARED, time to finish up + final Iterator i = txnables.iterator(); + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "commit:committing transaction mgr:" + + "{0}, id:{1}, state:{2}", + new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } ); + } + + while( i.hasNext() ) + { + ( (Transactable) i.next() ).commit( this, space ); + } + state = COMMITTED; + cleanup(); + return; + + default: + throw new IllegalStateException( "unknown Txn state: " + state ); + } + } + + /** + * Caution: see locking discussion at the class level. + */ + public synchronized Transactable add( Transactable t ) + { + txnables.add( t ); + return t; + } + + // inherit doc comment + public ServerTransaction getTransaction( ProxyPreparer preparer ) + throws IOException, ClassNotFoundException + { + if( tr == null ) + { + final TransactionManager mgr = + (TransactionManager) trm.get( preparer ); + tr = new ServerTransaction( mgr, trId ); + } + return tr; + } + + /** + * Return the manager associated with this transaction. + * + * @return the manager associated with this transaction. + * @throws IllegalStateException if this Txn + * is still broken. + */ + TransactionManager getManager() + { + if( tr == null ) + { + throw new IllegalStateException( "Txn is still broken" ); + } + return tr.mgr; + } + + /** + * Return the monitor task for this object. Note, this + * method is unsynchronized because it (and + * monitorTask(TxnMonitorTask) are both called + * from the same thread. + */ + TxnMonitorTask monitorTask() + { + return monitorTask; + } + + /** + * Set the monitor task for this object. Note, this method is + * unsynchronized because it (and monitorTask() are + * both called from the same thread. + */ + void monitorTask( TxnMonitorTask task ) + { + monitorTask = task; + } + + /** + * Clean up any state when the transaction is finished. + */ + private void cleanup() + { + if( monitorTask != null ) + { + monitorTask.cancel(); // stop doing this + } + } + + // ----------------------------------- + // Methods required by StorableObject + // ----------------------------------- + + // inherit doc comment + + public void store( ObjectOutputStream out ) throws IOException + { + /* There is a bunch of stuff we don't need to write. The + * Txn id not stored since it is handed back during + * recovery. The content is rebuilt txnables by the various + * recoverWrite and recoverTake calls. state is not written + * because it is always ACTIVE when we write, and always + * needs to be PREPARED when we read it back. + */ + out.writeObject( trm ); + out.writeLong( trId ); + } + + // inherit doc comment + public void restore( ObjectInputStream in ) + throws IOException, ClassNotFoundException + { + /* Only transactions that got prepared and not committed or + * aborted get recovered + */ + state = PREPARED; + trm = (StorableReference) in.readObject(); + trId = in.readLong(); + } +} Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitor.java URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitor.java?rev=724979&view=auto ============================================================================== --- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitor.java (added) +++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitor.java Tue Dec 9 21:13:53 2008 @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.sun.jini.outrigger; + +import com.sun.jini.config.Config; +import com.sun.jini.thread.TaskManager; +import com.sun.jini.thread.WakeupManager; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.logging.Level; +import java.util.logging.Logger; +import net.jini.config.Configuration; +import net.jini.config.ConfigurationException; + +/** + * This class provides a driver for monitoring the state of transactions + * that have blocked progress of other operations recently. It creates + * tasks that monitor each transaction by intermittently querying the + * transaction's state. If it finds that the transaction has aborted, + * it makes sure that the local space aborts the transaction, too, so + * that operations will cease to be blocked by the transaction. + * + * @author Sun Microsystems, Inc. + * @see TxnMonitorTask + * @see OutriggerServerImpl#monitor + */ +class TxnMonitor implements Runnable +{ + /** + * Each ToMonitor object represents a need to monitor + * the given transactions, possibly under a lease. + * + * @see #pending + */ + private static class ToMonitor + { + QueryWatcher query; // query governing interest in txns + Collection txns; // the transactions to monitor + + ToMonitor( QueryWatcher query, Collection txns ) + { + this.query = query; + this.txns = txns; + } + } + + /** + * This list is used to contain requests to monitor interfering + * transactions. We use a list like this so that the + * getMatch request that detected the conflict + * doesn't have to wait for all the setup before returning -- it + * just puts the data on this list and the TxnMonitor + * pulls it off using its own thread. + * + * @see #ToMonitor + * @see OutriggerServerImpl#getMatch + */ + private LinkedList pending = new LinkedList(); + + /** + * wakeup manager for TxnMonitorTasks + */ + private final WakeupManager wakeupMgr = + new WakeupManager( new WakeupManager.ThreadDesc( null, true ) ); + + /** + * The manager for TxnMonitorTask objects. + */ + private TaskManager taskManager; + + /** + * The space we belong to. Needed for aborts. + */ + private OutriggerServerImpl space; + + /** + * The thread running us. + */ + private Thread ourThread; + + /** + * Set when we are told to stop + */ + private boolean die = false; + + /** + * Logger for logging transaction related information + */ + private static final Logger logger = + Logger.getLogger( OutriggerServerImpl.txnLoggerName ); + + /** + * Create a new TxnMonitor. + */ + TxnMonitor( OutriggerServerImpl space, Configuration config ) + throws ConfigurationException + { + if( space == null ) + { + throw new NullPointerException( "space must be non-null" ); + } + this.space = space; + + taskManager = (TaskManager) Config.getNonNullEntry( config, + OutriggerServerImpl.COMPONENT_NAME, "txnMonitorTaskManager", + TaskManager.class, new TaskManager() ); + + ourThread = new Thread( this, "TxnMonitor" ); + ourThread.setDaemon( true ); + ourThread.start(); + } + + public void destroy() + { + taskManager.terminate(); + wakeupMgr.stop(); + + synchronized( this ) + { + die = true; + notifyAll(); + } + + try + { + ourThread.join(); + } + catch( InterruptedException ie ) + { + // ignore + } + } + + /** + * Return the space we're part of. + */ + OutriggerServerImpl space() + { + return space; + } + + /** + * Add a set of transactions to be monitored under the + * given query. + */ + synchronized void add( QueryWatcher query, Collection transactions ) + { + if( logger.isLoggable( Level.FINEST ) ) + { + final StringBuffer buf = new StringBuffer(); + buf.append( "Setting up monitor for " ); + buf.append( query ); + buf.append( " toMonitor:" ); + boolean notFirst = false; + for( Iterator i = transactions.iterator(); i.hasNext(); ) + { + if( notFirst ) + { + buf.append( "," ); + notFirst = true; + } + buf.append( i.next() ); + } + logger.log( Level.FINEST, buf.toString() ); + } + + pending.add( new ToMonitor( query, transactions ) ); + notifyAll(); + } + + /** + * Add a set of transactions to be monitored under no + * lease. + */ + void add( Collection transactions ) + { + add( null, transactions ); + } + + /** + * Take pending monitor requests off the queue, creating the + * required TxnMonitorTask objects and scheduling them. + */ + public void run() + { + try + { + ToMonitor tm; + for( ; ; ) + { + synchronized( this ) + { + + // Sleep if nothing is pending. + while( pending.isEmpty() && !die ) + { + wait(); + } + + if( die ) + { + return; + } + + tm = (ToMonitor) pending.removeFirst(); + } + + logger.log( Level.FINER, "creating monitor tasks for {0}", + tm.query ); + + Iterator it = tm.txns.iterator(); + while( it.hasNext() ) + { + Txn txn = (Txn) it.next(); + TxnMonitorTask task = taskFor( txn ); + task.add( tm.query ); + } + } + } + catch( InterruptedException e ) + { + return; + } + } + + /** + * Return the monitor task for this transaction, creating it if + * necessary. + */ + private TxnMonitorTask taskFor( Txn txn ) + { + TxnMonitorTask task = txn.monitorTask(); + if( task == null ) + { + logger.log( Level.FINER, "creating TxnMonitorTask for {0}", + txn ); + + task = new TxnMonitorTask( txn, this, taskManager, wakeupMgr ); + txn.monitorTask( task ); + taskManager.add( task ); // add it after we've set it in the txn + } + return task; + } +} Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitorTask.java URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitorTask.java?rev=724979&view=auto ============================================================================== --- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitorTask.java (added) +++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitorTask.java Tue Dec 9 21:13:53 2008 @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.sun.jini.outrigger; + +import com.sun.jini.constants.ThrowableConstants; +import com.sun.jini.constants.TxnConstants; +import com.sun.jini.logging.Levels; +import com.sun.jini.thread.RetryTask; +import com.sun.jini.thread.TaskManager; +import com.sun.jini.thread.WakeupManager; +import java.io.IOException; +import java.rmi.NoSuchObjectException; +import java.rmi.RemoteException; +import java.rmi.UnmarshalException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; +import net.jini.core.transaction.TransactionException; +import net.jini.core.transaction.UnknownTransactionException; +import net.jini.core.transaction.server.ServerTransaction; +import net.jini.core.transaction.server.TransactionConstants; + +/** + * A task that will try to validate the state of a transaction. This + * uses weak references a good deal to let the other parts of the system + * be GC'ed as necessary. + *

+ * The retry mechanism is subtle, so bear with me. The purpose is + * to ensure that if any activity is being blocked by a given + * transaction, that transaction will be tested at some point in + * the future (if necessary, i.e., if it still is thought to be + * active). We assume it to be rare that a transactions that the + * space thinks is active is, in fact, aborted, so the algorithm is + * designed to guarantee the detection without a lot of overhead, + * specifically without a lot of RMI calls. + *

+ * Each task has three values: a nextQuery time, a + * mustQuery boolean that force the next query to be + * made, and deltaT, the time at which the following + * query will be scheduled. When the task is awakened at its + * nextQuery time, it checks to see if it must make an + * actual query to the transaction manager, which it will do if either + * mustQuery is true, or if we know about + * any in progress queries on the space that are blocked on the + * transaction. Whether or not an actual query is made, + * deltaT is added to nextQuery to get the + * nextQuery time, deltaT is doubled, and + * mustQuery boolean is set to false. + *

+ * There are two kinds of requests that a with which transaction + * can cause a conflict -- those with long timeouts (such as + * blocking reads and takes) and those that are under short timeouts + * (such as reads and takes with zero-length timeouts). We will + * treat them separately at several points of the algorithm. A + * short timeout is any query whose expiration time is sooner than + * the nextQuery time. Any other timeout is long + * If a short query arrives, mustQuery is set to + * true. + *

+ * The result is that any time a transaction causes a conflict, if + * the query on the space has not ended by the time of the + * nextQuery we will attempt to poll the transaction manager. + * There will also poll the transaction manager if any conflict occurred + * on a query on the space with a short timeout. + *

+ * The first time a transaction causes a conflict, we schedule a + * time in the future at which we will poll its status. We do not + * poll right away because often a transaction will complete on + * its own before we get to that time, making the check + * unnecessary. An instant poll is, therefore, unnecessarily + * aggressive, since giving an initial grace time will usually mean + * no poll is made at all. So if the first conflict occurs at + * T0, the nextQuery value will be + * T0+INITIAL_GRACE, the boolean + * will be true to force that poll to happen, and + * deltaT will be set to INITIAL_GRACE. + * + * @author Sun Microsystems, Inc. + * @see TxnMonitor + */ +class TxnMonitorTask extends RetryTask + implements TransactionConstants, com.sun.jini.constants.TimeConstants +{ + /** + * transaction being monitored + */ + private final Txn txn; + + /** + * the monitor we were made by + */ + private final TxnMonitor monitor; + + /** + * All the queries on the space (not queries to the transaction + * manager) waiting for txn to be resolved. + * null until we have at least one. Represented by + * QueryWatcher objects. + */ + private Map queries; + + /** + * count of RemoteExceptions + */ + private int failCnt; + + /** + * The next time we need to poll the transaction manager + * to get txn's actual state. + */ + private long nextQuery; + + /** + * When we're given an opportunity to poll the transaction manager + * for the txn's state, do so. + */ + private boolean mustQuery; + + /** + * next value added to nextQuery + */ + private long deltaT; + + /** + * The initial grace period before the first query. + */ + private static final long INITIAL_GRACE = 15 * SECONDS; + + /** + * The retry time when we have an encountered an exception + */ + private static final long BETWEEN_EXCEPTIONS = 15 * SECONDS; + + /** + * The largest value that deltaT will reach. + */ + private static final long MAX_DELTA_T = 1 * HOURS; + + /** + * The maximum number of failures allowed in a row before we simply + * give up on the transaction and consider it aborted. + */ + private static final int MAX_FAILURES = 3; + + /** + * Logger for logging transaction related information + */ + private static final Logger logger = + Logger.getLogger( OutriggerServerImpl.txnLoggerName ); + + /** + * Create a new TxnMonitorTask. + */ + TxnMonitorTask( Txn txn, TxnMonitor monitor, + TaskManager manager, WakeupManager wakeupMgr ) + { + super( manager, wakeupMgr ); + this.txn = txn; + this.monitor = monitor; + nextQuery = startTime(); // retryTime will add INITIAL_GRACE + deltaT = INITIAL_GRACE; + mustQuery = true; + } + + /** + * Return the time of the next query, bumping deltaT as + * necessary for the next iteration. If the transaction has voted + * PREPARED or the manager has been giving us a + * RemoteException, we should retry on short times; + * otherwise we back off quickly. + */ + public long retryTime() + { + if( failCnt == 0 && txn.getState() != PREPARED ) + { // no failures + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "{0} retryTime adds {1}", + new Object[]{ this, new Long( deltaT ) } ); + } + + nextQuery += deltaT; + if( deltaT < MAX_DELTA_T ) + { + deltaT = Math.min( deltaT * 2, MAX_DELTA_T ); + } + } + else + { + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "{0} retryTime adds {1} (for {2})", + new Object[]{ this, new Long( BETWEEN_EXCEPTIONS ), + ( failCnt != 0 ? "failure" : "PREPARED" ) } ); + } + nextQuery += BETWEEN_EXCEPTIONS; + } + return nextQuery; + } + + /** + * We can run in parallel with any task, so just return + * false. + */ + public boolean runAfter( java.util.List tasks, int size ) + { + return false; + } + + /** + * Add a ``sibling'' transaction, one that is now blocking progress + * on one of the same entries. For example, if a client is blocked + * on a read, another transaction can read the same + * entry, thereby also blocking that same client. This means that + * the transaction for the second read must be + * watched, too. The list of queries for the second transaction + * might be less that the list of those in this transaction, but + * the process of figuring out the subset is too expensive, since + * we have tried to make the checking process itself cheap, + * anyway. So we add all queries this task is currently monitoring + * to the task monitoring the second transaction. If there are + * no queries, then the blocking occurred because of a short query + * or all the queries have expired, in which case the second transaction + * isn't blocking the way of anything currently, so this method does + * nothing. + *

+ * Of course, in order to avoid blocking the thread that is calling + * this (which is trying to perform a read, after + * all), we simply add each lease in this task to the monitor's + * queue. + * + * @see TxnEntryHandle#monitor + */ + //!! Would it be worth the overhead to make TxnEntryHandle.monitor + //!! search for the transaction with the smallest set of leases? -arnold + synchronized void addSibling( Txn txn ) + { + if( queries == null || queries.size() == 0 ) + { + return; + } + Collection sibling = Collections.nCopies( 1, txn ); + Iterator it = queries.keySet().iterator(); + while( it.hasNext() ) + { + QueryWatcher query = (QueryWatcher) it.next(); + if( query != null ) // from a weak map, so might be null + { + monitor.add( query, sibling ); + } + } + } + + /** + * Try to see if this transaction should be aborted. This returns + * true (don't repeat the task) if it knows that the + * transaction is no longer interesting to anyone. + */ + public boolean tryOnce() + { + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "{0} attempt {1} mustQuery:{2}", + new Object[]{ this, new Integer( attempt() ), + new Boolean( mustQuery ) } ); + } + + /* + * The first time we do nothing, since RetryTask invokes run first, + * but we want to wait a bit before testing the transaction. + */ + if( attempt() == 0 ) + { + return false; + } + + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "{0} txn.getState() = {1}", + new Object[]{ this, new Integer( txn.getState() ) } ); + } + + // not active or prepared == no longer blocking + int txnState = txn.getState(); + if( txnState != ACTIVE && txnState != PREPARED ) + { + return true; + } + + // if we're prepared, test every time -- this shouldn't take long + mustQuery |= ( txnState == PREPARED ); + + /* + * Go through the resources to see if we can find one still active + * that cares. Must be synchronized since we test, then clear -- + * another thread that set the flag between the test and clear + * would have its requirements lost. + */ + synchronized( this ) + { + if( !mustQuery ) + { // then try resources + if( queries == null ) // no resources, so nobody wants it + { + return false; // try again next time + } + + Iterator it = queries.keySet().iterator(); + boolean foundNeed = false; + + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "{0} nextQuery {1}", + new Object[]{ this, new Long( nextQuery ) } ); + } + + while( it.hasNext() ) + { + QueryWatcher query = (QueryWatcher) it.next(); + if( query == null ) // gone -- the map will reap it + { + continue; + } + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, + "{0} query.getExpiration() {1}", + new Object[]{ this, + new Long( query.getExpiration() ) } ); + } + + if( query.getExpiration() < nextQuery || + query.isResolved() ) + { + it.remove(); // expired, so we don't care about it + } + else + { + foundNeed = true; + break; + } + } + + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "{0} foundNeed = {1}", + new Object[]{ this, new Boolean( foundNeed ) } ); + } + + if( !foundNeed ) // nobody wants it + { + return false; // try again next time + } + } + mustQuery = false; // clear it for next time + } + + /* + * Now we know (a) the transaction itself is alive, and (b) some + * lease still cares. Make sure it's still active as far as the + * it knows, and if it is, then ask the manager about it. + */ + ServerTransaction tr; + try + { + /* This may fix a broken Txn, if it does it won't get moved + * from the broken to the unbroken list. It will get + * moved eventually, but it does seem unfortunate it does + * not happen immediately + */ + tr = txn.getTransaction( + monitor.space().getRecoveredTransactionManagerPreparer() ); + } + catch( RemoteException e ) + { + final int cat = ThrowableConstants.retryable( e ); + + if( cat == ThrowableConstants.BAD_INVOCATION || + cat == ThrowableConstants.BAD_OBJECT ) + { + // Not likely to get better, give up + logUnpackingFailure( "definite exception", Level.INFO, + true, e ); + return true; + } + else if( cat == ThrowableConstants.INDEFINITE ) + { + // try, try, again + logUnpackingFailure( "indefinite exception", Levels.FAILED, + false, e ); + mustQuery = true; + return false; + } + else if( cat == ThrowableConstants.UNCATEGORIZED ) + { + // Same as above but log differently. + mustQuery = true; + logUnpackingFailure( "uncategorized exception", Level.INFO, + false, e ); + return false; + } + else + { + logger.log( Level.WARNING, "ThrowableConstants.retryable " + + "returned out of range value, " + cat, + new AssertionError( e ) ); + return false; + } + } + catch( IOException e ) + { + // Not likely to get better + logUnpackingFailure( "IOException", Level.INFO, true, e ); + return true; + } + catch( RuntimeException e ) + { + // Not likely to get better + logUnpackingFailure( "RuntimeException", Level.INFO, true, e ); + return true; + } + catch( ClassNotFoundException e ) + { + // codebase probably down, keep trying + logUnpackingFailure( "ClassNotFoundException", Levels.FAILED, + false, e ); + mustQuery = true; + return false; + } + + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "{0} tr = {1}", new Object[]{ this, tr } ); + } + + int trState; + try + { + trState = tr.getState(); + } + catch( TransactionException e ) + { + if( logger.isLoggable( Level.INFO ) ) + { + logger.log( Level.INFO, "Got TransactionException when " + + "calling getState on " + tr + ", dropping transaction " + + tr.id, e ); + } + trState = ABORTED; + } + catch( NoSuchObjectException e ) + { + /* It would be epsilon better to to give up immediately + * if we get a NoSuchObjectException and we are in the + * active state, however, the code to do this would + * be very complicated since we need to hold a lock to + * while reading and acting on the state. + */ + if( ++failCnt >= MAX_FAILURES ) + { + if( logger.isLoggable( Level.INFO ) ) + { + logger.log( Level.INFO, "Got NoSuchObjectException when " + + "calling getState on " + tr + ", this was the " + + failCnt + " RemoteException, dropping transaction" + + tr.id, e ); + } + trState = ABORTED; + } + else + { + if( logger.isLoggable( Levels.FAILED ) ) + { + logger.log( Levels.FAILED, "Got NoSuchObjectException " + + "when calling getState on " + tr + ", failCount = " + + failCnt + ", will retry", e ); + } + mustQuery = true; // keep on trying + return false; // try again next time + } + } + catch( RemoteException e ) + { + if( ++failCnt >= MAX_FAILURES ) + { + /* abort if we are not prepared and not already + * aborted. If prepared retry, otherwise + * we're done. Check state and make any abort() call + * atomically so we can't accidently abort a prepared + * transaction. + */ + synchronized( txn ) + { + switch( txn.getState() ) + { + case ACTIVE: + // Safe to abort, give up + if( logger.isLoggable( Level.INFO ) ) + { + logger.log( Level.INFO, "Got RemoteException " + + "when calling getState on " + tr + ", this " + + "was " + failCnt + " RemoteException, " + + "dropping active transaction " + tr.id, e ); + } + + try + { + monitor.space().abort( tr.mgr, tr.id ); + return true; + } + catch( UnknownTransactionException ute ) + { + throw new AssertionError( ute ); + } + catch( UnmarshalException ume ) + { + throw new AssertionError( ume ); + } + case PREPARED: + final Level l = ( failCnt % MAX_FAILURES == 0 ) ? + Level.INFO : Levels.FAILED; + if( logger.isLoggable( l ) ) + { + logger.log( l, "Got RemoteException when calling " + + "getState on " + tr + ", this was " + + failCnt + " RemoteException, will keep " + + "prepared transaction " + tr.id, e ); + } + + // Can't give up, keep on trying to find real state + mustQuery = true; + return false; + case ABORTED: + case COMMITTED: + // done + return true; + default: + throw new AssertionError( "Txn in unreachable state" ); + } + } + } + else + { + // Don't know, but not ready to give up + if( logger.isLoggable( Levels.FAILED ) ) + { + logger.log( Levels.FAILED, "Got RemoteException when " + + "calling getState on " + tr + ", failCount = " + + failCnt + ", will retry", e ); + } + + mustQuery = true; // keep on trying + return false; // try again next time + } + } + + if( logger.isLoggable( Level.FINER ) ) + { + logger.log( Level.FINER, "{0} trState = {1}", + new Object[]{ this, new Integer( trState ) } ); + } + + failCnt = 0; // reset failures -- we got a response + + /* + * If the two states aren't the same, the state changed and we + * need to account for that locally here by calling the method + * that would make the change (the one we should have gotten. + * (We use the external forms of abort, commit, etc., because + * they are what the manager would call, and therefore these + * calls will always do exactly what the incoming manager + * calls would have done. I don't want this to be fragile by + * bypassing those calls and going straight to the Txn + * object's calls, which might skip something important in the + * OutriggerServerImpl calls). + */ + + if( trState != txnState ) + { + if( logger.isLoggable( Level.FINER ) ) + { + logger.log( Level.FINER, + "{0} mgr state[{1}] != local state [{2}]", + new Object[]{ this, + TxnConstants.getName( trState ), + TxnConstants.getName( txnState ) } ); + } + + try + { + switch( trState ) + { + case ABORTED: + logger.log( Level.FINER, "{0} moving to abort", this ); + + monitor.space().abort( tr.mgr, tr.id ); + return true; + + case COMMITTED: + logger.log( Level.FINER, "{0} moving to commit", this ); + + monitor.space().commit( tr.mgr, tr.id ); + return true; + } + } + catch( UnknownTransactionException e ) + { + // we must somehow have already gotten the abort() or + // commit(), and have therefore forgotten about the + // transaction, while this code was executing + return true; + } + catch( UnmarshalException ume ) + { + throw new AssertionError( ume ); + } + + // we can't fake anything else -- the manager will have to call + // us + } + + logger.log( Level.FINEST, "{0} return false", this ); + + return false; // now we know so nothing more to do + } + + /** + * Add in a resource. The lease may already be in, in which case it is + * ignored, or it may be null, in which case it was a non-leased probe + * that was blocked and we simply set mustQuery to + * true. + */ + synchronized void add( QueryWatcher query ) + { + if( query == null || query.getExpiration() <= nextQuery ) + { + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "adding resource to task -- SHORT" ); + } + mustQuery = true; + } + else + { + if( logger.isLoggable( Level.FINEST ) ) + { + logger.log( Level.FINEST, "adding resource to task -- LONG" ); + } + if( queries == null ) + { + queries = new WeakHashMap();// we use it like a WeakHashSet + } + queries.put( query, null ); + } + } + + /** + * Log failed unpacking attempt attempt + */ + private void logUnpackingFailure( String exceptionDescription, Level level, + boolean terminal, Throwable t ) + { + if( logger.isLoggable( level ) ) + { + logger.log( level, "Encountered " + exceptionDescription + + "while unpacking exception to check state, " + + ( terminal ? "dropping" : "keeping" ) + " monitoring task", t ); + } + } + +} Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnState.java URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnState.java?rev=724979&view=auto ============================================================================== --- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnState.java (added) +++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnState.java Tue Dec 9 21:13:53 2008 @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.sun.jini.outrigger; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.logging.Level; +import java.util.logging.Logger; +import net.jini.core.transaction.server.TransactionConstants; +import net.jini.space.InternalSpaceException; + +/** + * Class that manages transaction related state on behalf of + * EntryHandles. Can accommodate entries locked by + * more than one transaction. The synchronization of this object is + * managed by the EntryHandle that owns it. + * + * @author Sun Microsystems, Inc. + * @see EntryHandle + */ +class TxnState +{ + /** + * The list of known managers. In order to keep things small in the + * common case that there is only one known manager, mgrs + * is managed as a ``list'' in one of two states -- it is either a + * direct reference to the only manager for this handle, or a reference + * to an HashSet with entries for each associated manager. + */ + private Object mgrs; + + /** + * The current state of the handle, such as READ or + * TAKE. + */ + private int state; + + /** + * The holder the handle which owns this object is in + */ + final private EntryHolder holder; + + /** + * Logger for logging information about entry matching + */ + private static final Logger matchingLogger = + Logger.getLogger( OutriggerServerImpl.matchingLoggerName ); + + /** + * Logger for logging transaction related information + */ + private static final Logger txnLogger = + Logger.getLogger( OutriggerServerImpl.txnLoggerName ); + + /** + * Create a new TxnState. It will start initially + * with the type of lock indicated by state under the + * transaction managed by mgr. holder is + * the holder the associated entry handle is in. + */ + TxnState( TransactableMgr mgr, int state, EntryHolder holder ) + { + this.mgrs = mgr; + this.state = state; + this.holder = holder; + txnLogger.log( Level.FINER, "TxnState: TxnState: state = {0}", + TransactableMgr.stateNames[ state ] ); + } + + /** + * Prepare to commit this object's part of the transaction. Return + * the prepare's status. + */ + int prepare( TransactableMgr mgr, OutriggerServerImpl space, + EntryHandle owner ) + { + txnLogger.log( Level.FINEST, "TxnState: prepare: state = {0}", + TransactableMgr.stateNames[ state ] ); + + if( state == TransactableMgr.READ ) + { + final int locksLeft = removeMgr( mgr ); + /* If there is more than one lock left resolving this lock + * does not change anything (takes still can't take the + * entry and reads could already) Also need to make sure + * it has not be removed (say by expiration or a cancel) + * since this would cause to send events for entries that + * were removed before it became visible. Also could cause + * ifExists queries to hang on to the entry and block + * indefinitely. If they had already send the EntryTransition + * for the removal. + */ + if( locksLeft <= 1 && !owner.removed() ) + { + // Has the potential to resolve conflicts/generate events + if( locksLeft == 1 ) + { + space.recordTransition( + new EntryTransition( owner, mgr(), true, false, false ) ); + } + else if( locksLeft == 0 ) + { + space.recordTransition( + new EntryTransition( owner, null, true, false, false ) ); + } + else + { + throw new AssertionError( "Fewer than 0 locks left" ); + } + } + return TransactionConstants.NOTCHANGED; + } + + return TransactionConstants.PREPARED; + } + + /** + * Abort this object's part of the transaction. Return true + * if this clears the last transaction associated with this object. + */ + boolean abort( TransactableMgr mgr, OutriggerServerImpl space, + EntryHandle owner ) + { + boolean rslt = true; + + txnLogger.log( Level.FINEST, "TxnState: abort: state = {0}", + TransactableMgr.stateNames[ state ] ); + + if( state == TransactableMgr.READ || state == TransactableMgr.TAKE ) + { + final int locksLeft = removeMgr( mgr ); + rslt = ( locksLeft == 0 ); + /* If there is more than one lock left resolving this + * lock does not change anything (takes still can't + * take the entry and reads could already). Also + * need to make sure it has not be removed (say + * by expiration or a cancel), otherwise + * we could log a transition to visible after + * the removal was logged, this could cause + * ifExists queries to hang on to the entry + * and block indefinitely. (This raises an + * issue if remote events were sent for re-appearance) + */ + if( locksLeft <= 1 && !owner.removed() ) + { + // Has the potential to resolve conflicts/generate events + if( locksLeft == 1 ) + { + /* There could have only been multiple locks if + * they were all read locks, thus the lock the + * abort resolves must h be a read lock, and the + * remaining lock must be a read lock too. + * Therefore this must be an availability but not a + * visibility transition. + */ + assert state == TransactableMgr.READ; + + space.recordTransition( + new EntryTransition( owner, mgr(), true, false, false ) ); + } + else if( locksLeft == 0 ) + { + // Only a visibility transition if the lock being + // dropped was a take lock + final boolean visibility = ( state == TransactableMgr.TAKE ); + + space.recordTransition( + new EntryTransition( owner, null, true, visibility, + false ) ); + } + else + { + throw new AssertionError( "Fewer than 0 locks left" ); + } + } + } + else + { + /* must be a write, make the entry disappear, will + * call recordTransition() + */ + holder.remove( owner, false ); + } + + return rslt; + } + + /** + * Commit this object's part of the transaction. The + * space is the OutriggerServerImpl on + * which the operation happens -- some commit operations have + * space-wide side effects (for example, a commit of a + * write operation can cause event notifications for + * clients registered under the transaction's parent). Return true + * if this clears the last transaction associated with this + * object. + */ + boolean commit( TransactableMgr mgr, OutriggerServerImpl space, + EntryHandle owner ) + { + txnLogger.log( Level.FINEST, "TxnState: commit: state = {0}", + TransactableMgr.stateNames[ state ] ); + + switch( state ) + { + case TransactableMgr.WRITE: + //!! assumption -- single level transactions: + //!! we pass null where we should pass the parent txn + if( owner.removed() ) + // if removed() then this entry must + // have been taken as well as written under this + // transaction. + { + return true; + } + + space.recordTransition( + new EntryTransition( owner, null, true, true, true ) ); + return ( removeMgr( mgr ) == 0 ); + + case TransactableMgr.READ: + // Read locked entries should never get prepared. + throw new InternalSpaceException + ( "committing a read locked entry" ); + + case TransactableMgr.TAKE: + // remove calls recordTransition() + holder.remove( owner, false ); + // Resolves a lock + return true; // Take-locked entries only have one Txn + + default: + throw new InternalSpaceException( "unexpected state in " + + "TxnState.commit(): " + state ); + } + } + + /** + * Remove the given mgr from the list of known managers. Return the + * number of mgrs still associated with this entry. + */ + private int removeMgr( TransactableMgr mgr ) + { + if( mgrs == null ) + { + return 0; + } + + if( mgr == mgrs ) + { + mgrs = null; + return 0; + } + + final HashSet tab = (HashSet) mgrs; + tab.remove( mgr ); + return tab.size(); + } + + + /** + * Add mgr to the list of known managers, setting the + * state of this handle to op. + */ + void add( TransactableMgr mgr, int op ) + { + if( mgr == mgrs ) + { + return; // already the only one known + } + + if( mgrs instanceof TransactableMgr ) + { + Object origMgr = mgrs; + mgrs = new HashSet( 7 ); + ( (HashSet) mgrs ).add( origMgr ); + } + + // if mgr is in already this is harmless, and checking to prevent + // a redundant addition is more expensive + ( (HashSet) mgrs ).add( mgr ); + monitor( mgr ); + state = op; + } + + /** + * If we need to, add this manager to the list of transactions that + * need to be monitored because of conflicts over this entry. Any + * existing blocking txn is sufficient. + * + * @see TxnMonitorTask#addSibling + */ + private void monitor( TransactableMgr mgr ) + { + Txn txn = (Txn) mgr; + Iterator it = ( (HashSet) mgrs ).iterator(); + while( it.hasNext() ) + { + Txn otherTxn = (Txn) it.next(); + if( otherTxn != mgr && otherTxn.monitorTask() != null ) + { + otherTxn.monitorTask().addSibling( txn ); + return; + } + } + } + + /** + * It this entry is read locked promote to take locked and return + * true, otherwise return false. Assumes the take is being + * performed under the one transaction that owns a lock on the + * entry. + */ + boolean promoteToTakeIfNeeded() + { + // We can assume our state is ether WRITE or READ, if it was + // take this method would not be called (since take blocks + // other takes in the same transaction canPerform() would have + // returned false) + + if( state == TransactableMgr.WRITE ) + { + return false; + } + + state = TransactableMgr.TAKE; + return true; + } + + /** + * Returns true if the operation op + * under the transaction manager by mgr is legal on + * the associated entry given the operations already performed on + * the entry under other transactions. It is legal to: + *

    + *
  • Read an entry that has been read in any other transaction, + * or that has been written under the same transaction (that is, + * if mgr is the same as the writing transaction). + *
  • Take an entry that was written under the same transaction, or + * which was read only under the same transaction (that is, + * no other active transactions have also read it). + *
+ * All other operations are not legal, so canPerform + * otherwise returns false. + */ + boolean canPerform( TransactableMgr mgr, int op ) + { + //Note: If two transactions are performed + // within the same TransactableMgr + // (see Txn.java), that means its in + // the same Transaction (see Transaction.java). + // Operations under the same Transaction scope + // are the same as if under a null Transaction + // i.e. all operations are visible. + // + // Semantics need to be added for multi-level + // transactions. + + if( matchingLogger.isLoggable( Level.FINER ) ) + { + matchingLogger.log( Level.FINER, + "TxnState: canPerform({0}, {1}): state = {2}", + new Object[]{ mgr, new Integer( op ), + TransactableMgr.stateNames[ state ] } ); + } + + switch( state ) + { + case TransactableMgr.READ: + if( op == TransactableMgr.READ ) + { + return true; + } + return onlyMgr( mgr ); // can only modify if I'm the only participant + case TransactableMgr.WRITE: + if( ( op == TransactableMgr.READ ) || + ( op == TransactableMgr.TAKE ) ) + { + return onlyMgr( mgr ); + } + return false; // can't read unless I'm the writer + case TransactableMgr.TAKE: + return false; // can't take anything taken any time + } + return false; + } + + /** + * Return true if mgr is one of the managers + * known to be managing this entry. + */ + boolean knownMgr( TransactableMgr mgr ) + { + if( mgr == null ) + { + return false; + } + if( mgr == mgrs ) + { + return true; + } + if( mgrs instanceof HashSet ) + { + return ( (HashSet) mgrs ).contains( mgr ); + } + return false; + } + + /** + * Return true if the given manager is the only one + * we know about. + */ + boolean onlyMgr( TransactableMgr mgr ) + { + if( mgr == null ) + { + return false; + } + if( mgr == mgrs ) + { + return true; + } + if( mgrs instanceof HashSet ) + { + HashSet tab = (HashSet) mgrs; + return ( tab.size() == 1 && tab.contains( mgr ) ); + } + return false; + } + + /** + * Add all the managers of this transaction to the given + * collection. + */ + void addTxns( java.util.Collection collection ) + { + if( mgrs == null ) + { + return; + } + + if( mgrs instanceof HashSet ) + { + final HashSet tab = (HashSet) mgrs; + collection.addAll( (HashSet) mgrs ); + return; + } + + collection.add( mgrs ); + } + + /** + * Return true if there are no more transactions associated with + * this object. + */ + boolean empty() + { + if( mgrs == null ) + { + return true; + } + + if( mgrs instanceof HashSet ) + { + return ( (HashSet) mgrs ).isEmpty(); + } + + return false; + } + + /** + * Used by mgr() + */ + final private TransactableMgr[] rslt = new TransactableMgr[1]; + + /** + * Returns the one manager associated with this transaction. + * Throws an AssertionError if there is more or fewer than one + * manager associated with this transaction. + */ + private TransactableMgr mgr() + { + if( mgrs == null ) + { + throw new AssertionError + ( "mgr() called on a TxnState with no manager" ); + } + + if( mgrs instanceof HashSet ) + { + final HashSet tab = (HashSet) mgrs; + if( tab.size() != 1 ) + { + throw new AssertionError + ( "mgr() called on TxnState with more than one manager" ); + } + tab.toArray( rslt ); + return rslt[ 0 ]; + } + + return (TransactableMgr) mgrs; + } +}