river-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nic...@apache.org
Subject svn commit: r724979 [10/14] - in /incubator/river/jtsk/skunk/niclas1/services/outrigger: ./ src/ src/main/ src/main/java/ src/main/java/com/ src/main/java/com/sun/ src/main/java/com/sun/jini/ src/main/java/com/sun/jini/outrigger/ src/main/java/com/sun/...
Date Wed, 10 Dec 2008 05:13:55 GMT
Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/Txn.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/Txn.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/Txn.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/Txn.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger;
+
+import com.sun.jini.constants.TxnConstants;
+import com.sun.jini.logging.Levels;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.jini.core.transaction.CannotJoinException;
+import net.jini.core.transaction.server.ServerTransaction;
+import net.jini.core.transaction.server.TransactionConstants;
+import net.jini.core.transaction.server.TransactionManager;
+import net.jini.security.ProxyPreparer;
+import net.jini.space.InternalSpaceException;
+
+/**
+ * This class represents a space's state in a single transaction.
+ *
+ * Object of this class represent Jini transactions within outrigger.
+ * These transactions hold "Transactables" -- things that represent
+ * the actions that have been taken under this transaction. For example,
+ * if this transaction were to be cancelled, the Transactables are
+ * examined and serve as the list of things to roll back in order to
+ * restore the state of the Space status quo ante.
+ *
+ * This is achieved by having the transactables notified of state changes
+ * to this transaction such as preparing, commit, etc. The
+ * transactables themselves are responsible for "doing the right thing."
+ *
+ * NB: some--but not all--of the actions one can take with a transaction
+ * are managed internally by these objects. That is, all of the important
+ * methods objects of these types are synchronized. Therefore, two
+ * simultaneous calls to abort() and commit() are arbitrated properly.
+ *
+ * However, care must be taken when add()ing a transactable. Even
+ * though the add() method is synchronized if you check the state of
+ * the transaction to ensure that is active and then call add() the
+ * transaction could have left the active state between the check and
+ * the add() call unless the call obtains the appropriate locks. This
+ * is even more likely if other work needs to be done in addition to
+ * calling add() (e.g. persisting state, obtaining locks, etc.). The
+ * caller of add() should lock the associated transaction object and
+ * ensure that the transaction is still considered ACTIVE, do whatever
+ * work is necessary to complete while the transaction is in the ACTIVE
+ * state (including calling call add()) and then release the lock.
+ * This can be done by :
+ * <ul>
+ * <li> holding the lock on this object while checking the
+ * state and carrying out the operation (including calling add()), or
+ * <li> calling ensureActive() to check the state
+ * and obtain a non-exclusive lock, carrying out the operation
+ * (including calling add()), and then calling allowStateChange() to
+ * release the lock.
+ * </ul>
+ * The pair of ensureActive() and allowStateChange() allows for more
+ * concurrency if the operation is expected to take a long time, in
+ * that it will allow for other operations to be performed under the
+ * same transaction and let aborts prevent other operations from
+ * being started.
+ *
+ * @author Sun Microsystems, Inc.
+ */
+class Txn implements TransactableMgr, TransactionConstants, StorableObject
+{
+
+    /**
+     * The internal id Outrigger as assigned to the transaction
+     */
+    final private long id;
+
+    /**
+     * What state we think the transaction is in
+     */
+    private int state;
+
+    /**
+     * The transaction manager associated with the transaction
+     * this object is fronting for.
+     */
+    private StorableReference trm;
+
+    /**
+     * Cached <code>ServerTransaction</code> object for
+     * the transaction this object is fronting for.
+     */
+    private ServerTransaction tr;
+
+    /**
+     * The id the transaction manager assigned to this transaction
+     */
+    private long trId;
+
+    /**
+     * The list of <code>Transactable</code> participating in
+     * this transaction.
+     */
+    final private List txnables = new java.util.LinkedList();
+
+    /**
+     * The task responsible for monitoring to see if this
+     * transaction has been aborted with us being told, or
+     * null if no such task as been allocated.
+     */
+    private TxnMonitorTask monitorTask;
+
+    /**
+     * Count of number of threads holding a read lock on state
+     */
+    private int stateReaders = 0;
+
+    /**
+     * <code>true</code> if there is a blocked state change. Used
+     * to give writers priority.
+     */
+    private boolean stateChangeWaiting = false;
+
+    /**
+     * Logger for logging transaction related information
+     */
+    private static final Logger logger =
+        Logger.getLogger( OutriggerServerImpl.txnLoggerName );
+
+    /**
+     * Create a new <code>Txn</code> that represents our state in the
+     * given <code>ServerTransaction</code>.
+     */
+    Txn( ServerTransaction tr, long id )
+    {
+        this( id );
+        trId = tr.id;
+        this.tr = tr;
+        this.trm = new StorableReference( tr.mgr );
+        state = ACTIVE;
+
+        if( logger.isLoggable( Level.FINER ) )
+        {
+            logger.log( Level.FINER, "creating txn for transaction mgr:" +
+                                     "{0}, id:{1}, state:{2}",
+                        new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } );
+        }
+    }
+
+    /**
+     * Used in recovery
+     */
+    Txn( long id )
+    {
+        this.id = id;        // the txn id is not persisted
+    }
+
+    /**
+     * Get the id for this txn. Note that this id is NOT the same as
+     * the ID of the transaction. Since that ID is not unique (it must
+     * be qualified with the <code>ServerTransaction</code> object) we create
+     * our own internal id to make txns unique. This is needed since we
+     * may not have the <code>Transaction</code> unmarshalled.
+     */
+    Long getId()
+    {
+        return new Long( id );
+    }
+
+    /**
+     * We keep the transaction ID around because we may need it
+     * to identify a broken transaction after recovery.
+     */
+    long getTransactionId()
+    {
+        return trId;
+    }
+
+    /**
+     * Return our local view of the current state. Need to be holding
+     * the lock on this object or have called <code>ensureActive</code>
+     * to get the current value.
+     */
+    int getState()
+    {
+        return state;
+    }
+
+    /**
+     * Atomically checks that this transaction is in the active
+     * state and locks the transaction in the active state.
+     * The lock can be released by calling <code>allowStateChange</code>.
+     * Each call to this method should be paired with a call to
+     * <code>allowStateChange</code> in a finally block.
+     *
+     * @throws CannotJoinException if the transaction
+     *                             is not active or a state change is pending.
+     */
+    synchronized void ensureActive() throws CannotJoinException
+    {
+        if( state != ACTIVE || stateChangeWaiting )
+        {
+            final String msg = "transaction mgr:" + tr + ", id:" + trId +
+                               " not active, in state " + TxnConstants.getName( state );
+            final CannotJoinException e = new CannotJoinException( msg );
+            logger.log( Levels.FAILED, msg, e );
+            throw e;
+        }
+        assert stateReaders >= 0;
+        stateReaders++;
+    }
+
+    /**
+     * Release the read lock created by an <code>ensureActive</code>
+     * call. Does nothing if the transaction is not active or there is
+     * a state change pending and thus is safe to call even if the
+     * corresponding <code>ensureActive</code> call threw
+     * <code>CannotJoinException</code>.
+     */
+    synchronized void allowStateChange()
+    {
+        if( state != ACTIVE || stateChangeWaiting )
+        {
+            return;
+        }
+        stateReaders--;
+        assert stateReaders >= 0;
+        notifyAll();
+    }
+
+    /**
+     * Prevents new operations from being started under this
+     * transaction and blocks until in process operations are
+     * completed.
+     */
+    synchronized void makeInactive()
+    {
+        stateChangeWaiting = true;
+        assert stateReaders >= 0;
+        while( stateReaders != 0 )
+        {
+            try
+            {
+                wait();
+            }
+            catch( InterruptedException e )
+            {
+                throw new AssertionError( e );
+            }
+            assert stateReaders >= 0;
+        }
+    }
+
+    /**
+     * Prepare for transaction commit. <code>makeInactive</code> must have
+     * been called on this transaction first.
+     */
+    synchronized int prepare( OutriggerServerImpl space )
+    {
+        assert stateChangeWaiting : "prepare called before makeInactive";
+        assert stateReaders == 0 : "prepare called before makeInactive completed";
+
+        if( logger.isLoggable( Level.FINER ) )
+        {
+            logger.log( Level.FINER, "prepare: transaction mgr:{0}, id:{1}, " +
+                                     "state:{2}",
+                        new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } );
+        }
+
+        switch( state )
+        {
+        case ABORTED:                   // previously aborted
+            return ABORTED;
+
+        case COMMITTED:               // previously committed
+            throw new IllegalStateException(); // "cannot happen"
+
+        case NOTCHANGED:               // previously voted NOTCHANGED
+        case PREPARED:               // previously voted PREPARED
+            return state;               // they are idempotent, and
+        // and we have nothing to do
+        // so return
+
+        case ACTIVE:                   // currently active
+            boolean changed = false;           // did this txn change
+            // anything?
+
+            if( logger.isLoggable( Level.FINEST ) )
+            {
+                logger.log( Level.FINEST, "prepare:preparing transaction mgr:" +
+                                          "{0}, id:{1}, state:{2}",
+                            new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } );
+            }
+
+            // loop through Transactable members of this Txn
+            final Iterator i = txnables.iterator();
+            int c = 0; // Counter for debugging message
+            while( i.hasNext() )
+            {
+                // get this member's vote
+                final Transactable transactable = (Transactable) i.next();
+                final int prepState = transactable.prepare( this, space );
+                if( logger.isLoggable( Level.FINEST ) )
+                {
+                    logger.log( Level.FINEST, "prepare:prepared " +
+                                              "transactable {0} for transaction mgr:{1}, id:{2}," +
+                                              " transactable now in state {3}",
+                                new Object[]{ transactable, tr, new Long( trId ),
+                                              TxnConstants.getName( prepState ) } );
+                }
+
+                switch( prepState )
+                {
+                case PREPARED:         // has prepared state
+                    changed = true;         // this means a change
+                    continue;
+
+                case ABORTED:             // has to abort
+                    abort( space );         // abort this txn (does cleanup)
+                    state = ABORTED;
+                    return state;         // vote aborted
+
+                case NOTCHANGED:         // no change
+                    i.remove();              // Won't need to call again
+                    continue;
+
+                default:             // huh?
+                    throw new
+                        InternalSpaceException( "prepare said " + prepState );
+                }
+            }
+
+            if( changed )
+            {
+                state = PREPARED;
+                // have to watch this since it's now holding permanent
+                // resources
+                space.monitor( Collections.nCopies( 1, this ) );
+            }
+            else
+            {
+                state = NOTCHANGED;
+            }
+            break;
+
+        default:
+            throw new IllegalStateException( "unknown Txn state: " + state );
+        }
+
+        return state;
+    }
+
+    /**
+     * Abort the transaction.  This must be callable from
+     * <code>prepare</code> because if a <code>Transactable</code>
+     * votes <code>ABORT</code>, this method is called to make that
+     * happen. <code>makeInactive</code> must have been called on this
+     * transaction first.
+     */
+    synchronized void abort( OutriggerServerImpl space )
+    {
+        assert stateChangeWaiting : "abort called before makeInactive";
+        assert stateReaders == 0 : "abort called before makeInactive completed";
+
+        if( logger.isLoggable( Level.FINER ) )
+        {
+            logger.log( Level.FINER, "abort: transaction mgr:{0}, id:{1}, " +
+                                     "state:{2}",
+                        new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } );
+        }
+
+        switch( state )
+        {
+        case ABORTED:        // already aborted
+        case NOTCHANGED:    // nothing to abort
+            break;
+
+        case COMMITTED:    // "cannot happen"
+            throw new IllegalStateException( "aborting a committed txn" );
+
+        case ACTIVE:
+        case PREPARED:
+            if( logger.isLoggable( Level.FINEST ) )
+            {
+                logger.log( Level.FINEST, "abort:aborting transaction mgr:" +
+                                          "{0}, id:{1}, state:{2}",
+                            new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } );
+            }
+
+            final Iterator i = txnables.iterator();
+            while( i.hasNext() )
+            {
+                ( (Transactable) i.next() ).abort( this, space );
+            }
+            state = ABORTED;
+            cleanup();
+            break;
+
+        default:
+            throw new IllegalStateException( "unknown Txn state: " + state );
+        }
+    }
+
+    /**
+     * Having prepared, roll the changes
+     * forward. <code>makeInactive</code> must have been called on
+     * this transaction first.
+     */
+    synchronized void commit( OutriggerServerImpl space )
+    {
+        assert stateChangeWaiting : "commit called before makeInactive";
+        assert stateReaders == 0 : "commit called before makeInactive completed";
+
+        //!! Need to involve mgr here
+        if( logger.isLoggable( Level.FINER ) )
+        {
+            logger.log( Level.FINER, "commit: transaction mgr:{0}, id:{1}, " +
+                                     "state:{2}",
+                        new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } );
+        }
+
+        switch( state )
+        {
+        case ABORTED:        // "cannot happen" stuff
+        case ACTIVE:
+        case NOTCHANGED:
+            throw new IllegalStateException( "committing "
+                                             + TxnConstants.getName( state ) + " txn" );
+
+        case COMMITTED:    // previous committed, that's okay
+            return;
+
+        case PREPARED:    // voted PREPARED, time to finish up
+            final Iterator i = txnables.iterator();
+            if( logger.isLoggable( Level.FINEST ) )
+            {
+                logger.log( Level.FINEST, "commit:committing transaction mgr:" +
+                                          "{0}, id:{1}, state:{2}",
+                            new Object[]{ tr, new Long( trId ), TxnConstants.getName( state ) } );
+            }
+
+            while( i.hasNext() )
+            {
+                ( (Transactable) i.next() ).commit( this, space );
+            }
+            state = COMMITTED;
+            cleanup();
+            return;
+
+        default:
+            throw new IllegalStateException( "unknown Txn state: " + state );
+        }
+    }
+
+    /**
+     * Caution: see locking discussion at the class level.
+     */
+    public synchronized Transactable add( Transactable t )
+    {
+        txnables.add( t );
+        return t;
+    }
+
+    // inherit doc comment
+    public ServerTransaction getTransaction( ProxyPreparer preparer )
+        throws IOException, ClassNotFoundException
+    {
+        if( tr == null )
+        {
+            final TransactionManager mgr =
+                (TransactionManager) trm.get( preparer );
+            tr = new ServerTransaction( mgr, trId );
+        }
+        return tr;
+    }
+
+    /**
+     * Return the manager associated with this transaction.
+     *
+     * @return the manager associated with this transaction.
+     * @throws IllegalStateException if this <code>Txn</code>
+     *                               is still broken.
+     */
+    TransactionManager getManager()
+    {
+        if( tr == null )
+        {
+            throw new IllegalStateException( "Txn is still broken" );
+        }
+        return tr.mgr;
+    }
+
+    /**
+     * Return the monitor task for this object. Note, this
+     * method is unsynchronized because it (and
+     * <code>monitorTask(TxnMonitorTask)</code> are both called
+     * from the same thread.
+     */
+    TxnMonitorTask monitorTask()
+    {
+        return monitorTask;
+    }
+
+    /**
+     * Set the monitor task for this object. Note, this method is
+     * unsynchronized because it (and <code>monitorTask()</code> are
+     * both called from the same thread.
+     */
+    void monitorTask( TxnMonitorTask task )
+    {
+        monitorTask = task;
+    }
+
+    /**
+     * Clean up any state when the transaction is finished.
+     */
+    private void cleanup()
+    {
+        if( monitorTask != null )
+        {
+            monitorTask.cancel();    // stop doing this
+        }
+    }
+
+    // -----------------------------------
+    //  Methods required by StorableObject
+    // -----------------------------------
+
+    // inherit doc comment
+
+    public void store( ObjectOutputStream out ) throws IOException
+    {
+        /* There is a bunch of stuff we don't need to write. The
+       * Txn id not stored since it is handed back during
+       * recovery. The content is rebuilt txnables by the various
+       * recoverWrite and recoverTake calls. state is not written
+       * because it is always ACTIVE when we write, and always
+       * needs to be PREPARED when we read it back.
+       */
+        out.writeObject( trm );
+        out.writeLong( trId );
+    }
+
+    // inherit doc comment
+    public void restore( ObjectInputStream in )
+        throws IOException, ClassNotFoundException
+    {
+        /* Only transactions that got prepared and not committed or
+       * aborted get recovered
+       */
+        state = PREPARED;
+        trm = (StorableReference) in.readObject();
+        trId = in.readLong();
+    }
+}

Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitor.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitor.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitor.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitor.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger;
+
+import com.sun.jini.config.Config;
+import com.sun.jini.thread.TaskManager;
+import com.sun.jini.thread.WakeupManager;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.jini.config.Configuration;
+import net.jini.config.ConfigurationException;
+
+/**
+ * This class provides a driver for monitoring the state of transactions
+ * that have blocked progress of other operations recently.  It creates
+ * tasks that monitor each transaction by intermittently querying the
+ * transaction's state.  If it finds that the transaction has aborted,
+ * it makes sure that the local space aborts the transaction, too, so
+ * that operations will cease to be blocked by the transaction.
+ *
+ * @author Sun Microsystems, Inc.
+ * @see TxnMonitorTask
+ * @see OutriggerServerImpl#monitor
+ */
+class TxnMonitor implements Runnable
+{
+    /**
+     * Each <code>ToMonitor</code> object represents a need to monitor
+     * the given transactions, possibly under a lease.
+     *
+     * @see #pending
+     */
+    private static class ToMonitor
+    {
+        QueryWatcher query;         // query governing interest in txns
+        Collection txns;           // the transactions to monitor
+
+        ToMonitor( QueryWatcher query, Collection txns )
+        {
+            this.query = query;
+            this.txns = txns;
+        }
+    }
+
+    /**
+     * This list is used to contain requests to monitor interfering
+     * transactions.  We use a list like this so that the
+     * <code>getMatch</code> request that detected the conflict
+     * doesn't have to wait for all the setup before returning -- it
+     * just puts the data on this list and the <code>TxnMonitor</code>
+     * pulls it off using its own thread.
+     *
+     * @see #ToMonitor
+     * @see OutriggerServerImpl#getMatch
+     */
+    private LinkedList pending = new LinkedList();
+
+    /**
+     * wakeup manager for <code>TxnMonitorTask</code>s
+     */
+    private final WakeupManager wakeupMgr =
+        new WakeupManager( new WakeupManager.ThreadDesc( null, true ) );
+
+    /**
+     * The manager for <code>TxnMonitorTask</code> objects.
+     */
+    private TaskManager taskManager;
+
+    /**
+     * The space we belong to.  Needed for aborts.
+     */
+    private OutriggerServerImpl space;
+
+    /**
+     * The thread running us.
+     */
+    private Thread ourThread;
+
+    /**
+     * Set when we are told to stop
+     */
+    private boolean die = false;
+
+    /**
+     * Logger for logging transaction related information
+     */
+    private static final Logger logger =
+        Logger.getLogger( OutriggerServerImpl.txnLoggerName );
+
+    /**
+     * Create a new TxnMonitor.
+     */
+    TxnMonitor( OutriggerServerImpl space, Configuration config )
+        throws ConfigurationException
+    {
+        if( space == null )
+        {
+            throw new NullPointerException( "space must be non-null" );
+        }
+        this.space = space;
+
+        taskManager = (TaskManager) Config.getNonNullEntry( config,
+                                                            OutriggerServerImpl.COMPONENT_NAME, "txnMonitorTaskManager",
+                                                            TaskManager.class, new TaskManager() );
+
+        ourThread = new Thread( this, "TxnMonitor" );
+        ourThread.setDaemon( true );
+        ourThread.start();
+    }
+
+    public void destroy()
+    {
+        taskManager.terminate();
+        wakeupMgr.stop();
+
+        synchronized( this )
+        {
+            die = true;
+            notifyAll();
+        }
+
+        try
+        {
+            ourThread.join();
+        }
+        catch( InterruptedException ie )
+        {
+            // ignore
+        }
+    }
+
+    /**
+     * Return the space we're part of.
+     */
+    OutriggerServerImpl space()
+    {
+        return space;
+    }
+
+    /**
+     * Add a set of <code>transactions</code> to be monitored under the
+     * given query.
+     */
+    synchronized void add( QueryWatcher query, Collection transactions )
+    {
+        if( logger.isLoggable( Level.FINEST ) )
+        {
+            final StringBuffer buf = new StringBuffer();
+            buf.append( "Setting up monitor for " );
+            buf.append( query );
+            buf.append( " toMonitor:" );
+            boolean notFirst = false;
+            for( Iterator i = transactions.iterator(); i.hasNext(); )
+            {
+                if( notFirst )
+                {
+                    buf.append( "," );
+                    notFirst = true;
+                }
+                buf.append( i.next() );
+            }
+            logger.log( Level.FINEST, buf.toString() );
+        }
+
+        pending.add( new ToMonitor( query, transactions ) );
+        notifyAll();
+    }
+
+    /**
+     * Add a set of <code>transactions</code> to be monitored under no
+     * lease.
+     */
+    void add( Collection transactions )
+    {
+        add( null, transactions );
+    }
+
+    /**
+     * Take pending monitor requests off the queue, creating the
+     * required <code>TxnMonitorTask</code> objects and scheduling them.
+     */
+    public void run()
+    {
+        try
+        {
+            ToMonitor tm;
+            for( ; ; )
+            {
+                synchronized( this )
+                {
+
+                    // Sleep if nothing is pending.
+                    while( pending.isEmpty() && !die )
+                    {
+                        wait();
+                    }
+
+                    if( die )
+                    {
+                        return;
+                    }
+
+                    tm = (ToMonitor) pending.removeFirst();
+                }
+
+                logger.log( Level.FINER, "creating monitor tasks for {0}",
+                            tm.query );
+
+                Iterator it = tm.txns.iterator();
+                while( it.hasNext() )
+                {
+                    Txn txn = (Txn) it.next();
+                    TxnMonitorTask task = taskFor( txn );
+                    task.add( tm.query );
+                }
+            }
+        }
+        catch( InterruptedException e )
+        {
+            return;
+        }
+    }
+
+    /**
+     * Return the monitor task for this transaction, creating it if
+     * necessary.
+     */
+    private TxnMonitorTask taskFor( Txn txn )
+    {
+        TxnMonitorTask task = txn.monitorTask();
+        if( task == null )
+        {
+            logger.log( Level.FINER, "creating TxnMonitorTask for {0}",
+                        txn );
+
+            task = new TxnMonitorTask( txn, this, taskManager, wakeupMgr );
+            txn.monitorTask( task );
+            taskManager.add( task );  // add it after we've set it in the txn
+        }
+        return task;
+    }
+}

Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitorTask.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitorTask.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitorTask.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnMonitorTask.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger;
+
+import com.sun.jini.constants.ThrowableConstants;
+import com.sun.jini.constants.TxnConstants;
+import com.sun.jini.logging.Levels;
+import com.sun.jini.thread.RetryTask;
+import com.sun.jini.thread.TaskManager;
+import com.sun.jini.thread.WakeupManager;
+import java.io.IOException;
+import java.rmi.NoSuchObjectException;
+import java.rmi.RemoteException;
+import java.rmi.UnmarshalException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.jini.core.transaction.TransactionException;
+import net.jini.core.transaction.UnknownTransactionException;
+import net.jini.core.transaction.server.ServerTransaction;
+import net.jini.core.transaction.server.TransactionConstants;
+
+/**
+ * A task that will try to validate the state of a transaction.  This
+ * uses weak references a good deal to let the other parts of the system
+ * be GC'ed as necessary.
+ * <p>
+ * The retry mechanism is subtle, so bear with me.  The purpose is
+ * to ensure that if any activity is being blocked by a given
+ * transaction, that transaction will be tested at some point in
+ * the future (if necessary, i.e., if it still is thought to be
+ * active).  We assume it to be rare that a transactions that the
+ * space thinks is active is, in fact, aborted, so the algorithm is
+ * designed to guarantee the detection without a lot of overhead,
+ * specifically without a lot of RMI calls.
+ * <p>
+ * Each task has three values: a <code>nextQuery</code> time, a
+ * <code>mustQuery</code> boolean that force the next query to be
+ * made, and <code>deltaT</code>, the time at which the following
+ * query will be scheduled.  When the task is awakened at its
+ * <code>nextQuery</code> time, it checks to see if it must make an
+ * actual query to the transaction manager, which it will do if either
+ * <code>mustQuery</code> is <code>true</code>, or if we know about
+ * any in progress queries on the space that are blocked on the
+ * transaction.  Whether or not an actual query is made,
+ * <code>deltaT</code> is added to <code>nextQuery</code> to get the
+ * <code>nextQuery</code> time, <code>deltaT</code> is doubled, and
+ * <code>mustQuery</code> boolean is set to <code>false</code>.
+ * <p>
+ * There are two kinds of requests that a with which transaction
+ * can cause a conflict -- those with long timeouts (such as
+ * blocking reads and takes) and those that are under short timeouts
+ * (such as reads and takes with zero-length timeouts).  We will
+ * treat them separately at several points of the algorithm.  A
+ * short timeout is any query whose expiration time is sooner than
+ * the <code>nextQuery</code> time.  Any other timeout is long
+ * If a short query arrives, <code>mustQuery</code> is set to
+ * <code>true</code>.
+ * <p>
+ * The result is that any time a transaction causes a conflict, if
+ * the query on the space has not ended by the time of the
+ * <code>nextQuery</code> we will attempt to poll the transaction manager.
+ * There will also poll the transaction manager if any conflict occurred
+ * on a query on the space with a short timeout.
+ * <p>
+ * The first time a transaction causes a conflict, we schedule a
+ * time in the future at which we will poll its status.  We do not
+ * poll right away because often a transaction will complete on
+ * its own before we get to that time, making the check
+ * unnecessary.  An instant poll is, therefore, unnecessarily
+ * aggressive, since giving an initial grace time will usually mean
+ * no poll is made at all.  So if the first conflict occurs at
+ * <i>T</i><sub>0</sub>, the <code>nextQuery</code> value will be
+ * <i>T</i><sub>0</sub><code>+INITIAL_GRACE</code>, the boolean
+ * will be <code>true</code> to force that poll to happen, and
+ * <code>deltaT</code> will be set to <code>INITIAL_GRACE</code>.
+ *
+ * @author Sun Microsystems, Inc.
+ * @see TxnMonitor
+ */
+class TxnMonitorTask extends RetryTask
+    implements TransactionConstants, com.sun.jini.constants.TimeConstants
+{
+    /**
+     * transaction being monitored
+     */
+    private final Txn txn;
+
+    /**
+     * the monitor we were made by
+     */
+    private final TxnMonitor monitor;
+
+    /**
+     * All the queries on the space (not queries to the transaction
+     * manager) waiting for <code>txn</code> to be resolved.
+     * <code>null</code> until we have at least one. Represented by
+     * <code>QueryWatcher</code> objects.
+     */
+    private Map queries;
+
+    /**
+     * count of RemoteExceptions
+     */
+    private int failCnt;
+
+    /**
+     * The next time we need to poll the transaction manager
+     * to get <code>txn</code>'s actual state.
+     */
+    private long nextQuery;
+
+    /**
+     * When we're given an opportunity to poll the transaction manager
+     * for the <code>txn</code>'s state, do so.
+     */
+    private boolean mustQuery;
+
+    /**
+     * next value added to <code>nextQuery</code>
+     */
+    private long deltaT;
+
+    /**
+     * The initial grace period before the first query.
+     */
+    private static final long INITIAL_GRACE = 15 * SECONDS;
+
+    /**
+     * The retry time when we have an encountered an exception
+     */
+    private static final long BETWEEN_EXCEPTIONS = 15 * SECONDS;
+
+    /**
+     * The largest value that <code>deltaT</code> will reach.
+     */
+    private static final long MAX_DELTA_T = 1 * HOURS;
+
+    /**
+     * The maximum number of failures allowed in a row before we simply
+     * give up on the transaction and consider it aborted.
+     */
+    private static final int MAX_FAILURES = 3;
+
+    /**
+     * Logger for logging transaction related information
+     */
+    private static final Logger logger =
+        Logger.getLogger( OutriggerServerImpl.txnLoggerName );
+
+    /**
+     * Create a new TxnMonitorTask.
+     */
+    TxnMonitorTask( Txn txn, TxnMonitor monitor,
+                    TaskManager manager, WakeupManager wakeupMgr )
+    {
+        super( manager, wakeupMgr );
+        this.txn = txn;
+        this.monitor = monitor;
+        nextQuery = startTime();    // retryTime will add INITIAL_GRACE
+        deltaT = INITIAL_GRACE;
+        mustQuery = true;
+    }
+
+    /**
+     * Return the time of the next query, bumping <code>deltaT</code> as
+     * necessary for the next iteration.  If the transaction has voted
+     * <code>PREPARED</code> or the manager has been giving us a
+     * <code>RemoteException</code>, we should retry on short times;
+     * otherwise we back off quickly.
+     */
+    public long retryTime()
+    {
+        if( failCnt == 0 && txn.getState() != PREPARED )
+        {      // no failures
+            if( logger.isLoggable( Level.FINEST ) )
+            {
+                logger.log( Level.FINEST, "{0} retryTime adds {1}",
+                            new Object[]{ this, new Long( deltaT ) } );
+            }
+
+            nextQuery += deltaT;
+            if( deltaT < MAX_DELTA_T )
+            {
+                deltaT = Math.min( deltaT * 2, MAX_DELTA_T );
+            }
+        }
+        else
+        {
+            if( logger.isLoggable( Level.FINEST ) )
+            {
+                logger.log( Level.FINEST, "{0} retryTime adds {1} (for {2})",
+                            new Object[]{ this, new Long( BETWEEN_EXCEPTIONS ),
+                                          ( failCnt != 0 ? "failure" : "PREPARED" ) } );
+            }
+            nextQuery += BETWEEN_EXCEPTIONS;
+        }
+        return nextQuery;
+    }
+
+    /**
+     * We can run in parallel with any task, so just return
+     * <CODE>false</CODE>.
+     */
+    public boolean runAfter( java.util.List tasks, int size )
+    {
+        return false;
+    }
+
+    /**
+     * Add a ``sibling'' transaction, one that is now blocking progress
+     * on one of the same entries.  For example, if a client is blocked
+     * on a <code>read</code>, another transaction can read the same
+     * entry, thereby also blocking that same client.  This means that
+     * the transaction for the second <code>read</code> must be
+     * watched, too.  The list of queries for the second transaction
+     * might be less that the list of those in this transaction, but
+     * the process of figuring out the subset is too expensive, since
+     * we have tried to make the checking process itself cheap,
+     * anyway.  So we add all queries this task is currently monitoring
+     * to the task monitoring the second transaction.  If there are
+     * no queries, then the blocking occurred because of a short query
+     * or all the queries have expired, in which case the second transaction
+     * isn't blocking the way of anything currently, so this method does
+     * nothing.
+     * <p>
+     * Of course, in order to avoid blocking the thread that is calling
+     * this (which is trying to perform a <code>read</code>, after
+     * all), we simply add each lease in this task to the monitor's
+     * queue.
+     *
+     * @see TxnEntryHandle#monitor
+     */
+    //!! Would it be worth the overhead to make TxnEntryHandle.monitor
+    //!! search for the transaction with the smallest set of leases?  -arnold
+    synchronized void addSibling( Txn txn )
+    {
+        if( queries == null || queries.size() == 0 )
+        {
+            return;
+        }
+        Collection sibling = Collections.nCopies( 1, txn );
+        Iterator it = queries.keySet().iterator();
+        while( it.hasNext() )
+        {
+            QueryWatcher query = (QueryWatcher) it.next();
+            if( query != null )    // from a weak map, so might be null
+            {
+                monitor.add( query, sibling );
+            }
+        }
+    }
+
+    /**
+     * Try to see if this transaction should be aborted.  This returns
+     * <code>true</code> (don't repeat the task) if it knows that the
+     * transaction is no longer interesting to anyone.
+     */
+    public boolean tryOnce()
+    {
+        if( logger.isLoggable( Level.FINEST ) )
+        {
+            logger.log( Level.FINEST, "{0} attempt {1} mustQuery:{2}",
+                        new Object[]{ this, new Integer( attempt() ),
+                                      new Boolean( mustQuery ) } );
+        }
+
+        /*
+       * The first time we do nothing, since RetryTask invokes run first,
+       * but we want to wait a bit before testing the transaction.
+       */
+        if( attempt() == 0 )
+        {
+            return false;
+        }
+
+        if( logger.isLoggable( Level.FINEST ) )
+        {
+            logger.log( Level.FINEST, "{0} txn.getState() = {1}",
+                        new Object[]{ this, new Integer( txn.getState() ) } );
+        }
+
+        // not active or prepared == no longer blocking
+        int txnState = txn.getState();
+        if( txnState != ACTIVE && txnState != PREPARED )
+        {
+            return true;
+        }
+
+        // if we're prepared, test every time -- this shouldn't take long
+        mustQuery |= ( txnState == PREPARED );
+
+        /*
+       * Go through the resources to see if we can find one still active
+       * that cares.  Must be synchronized since we test, then clear --
+       * another thread that set the flag between the test and clear
+       * would have its requirements lost.
+       */
+        synchronized( this )
+        {
+            if( !mustQuery )
+            {        // then try resources
+                if( queries == null )    // no resources, so nobody wants it
+                {
+                    return false;    // try again next time
+                }
+
+                Iterator it = queries.keySet().iterator();
+                boolean foundNeed = false;
+
+                if( logger.isLoggable( Level.FINEST ) )
+                {
+                    logger.log( Level.FINEST, "{0} nextQuery {1}",
+                                new Object[]{ this, new Long( nextQuery ) } );
+                }
+
+                while( it.hasNext() )
+                {
+                    QueryWatcher query = (QueryWatcher) it.next();
+                    if( query == null )     // gone -- the map will reap it
+                    {
+                        continue;
+                    }
+                    if( logger.isLoggable( Level.FINEST ) )
+                    {
+                        logger.log( Level.FINEST,
+                                    "{0} query.getExpiration() {1}",
+                                    new Object[]{ this,
+                                                  new Long( query.getExpiration() ) } );
+                    }
+
+                    if( query.getExpiration() < nextQuery ||
+                        query.isResolved() )
+                    {
+                        it.remove();    // expired, so we don't care about it
+                    }
+                    else
+                    {
+                        foundNeed = true;
+                        break;
+                    }
+                }
+
+                if( logger.isLoggable( Level.FINEST ) )
+                {
+                    logger.log( Level.FINEST, "{0} foundNeed = {1}",
+                                new Object[]{ this, new Boolean( foundNeed ) } );
+                }
+
+                if( !foundNeed )        // nobody wants it
+                {
+                    return false;    // try again next time
+                }
+            }
+            mustQuery = false;        // clear it for next time
+        }
+
+        /*
+       * Now we know (a) the transaction itself is alive, and (b) some
+       * lease still cares.  Make sure it's still active as far as the
+       * it knows, and if it is, then ask the manager about it.
+       */
+        ServerTransaction tr;
+        try
+        {
+            /* This may fix a broken Txn, if it does it won't get moved
+            * from the broken to the unbroken list. It will get
+            * moved eventually, but it does seem unfortunate it does
+            * not happen immediately
+            */
+            tr = txn.getTransaction(
+                monitor.space().getRecoveredTransactionManagerPreparer() );
+        }
+        catch( RemoteException e )
+        {
+            final int cat = ThrowableConstants.retryable( e );
+
+            if( cat == ThrowableConstants.BAD_INVOCATION ||
+                cat == ThrowableConstants.BAD_OBJECT )
+            {
+                // Not likely to get better, give up
+                logUnpackingFailure( "definite exception", Level.INFO,
+                                     true, e );
+                return true;
+            }
+            else if( cat == ThrowableConstants.INDEFINITE )
+            {
+                // try, try, again
+                logUnpackingFailure( "indefinite exception", Levels.FAILED,
+                                     false, e );
+                mustQuery = true;
+                return false;
+            }
+            else if( cat == ThrowableConstants.UNCATEGORIZED )
+            {
+                // Same as above but log differently.
+                mustQuery = true;
+                logUnpackingFailure( "uncategorized exception", Level.INFO,
+                                     false, e );
+                return false;
+            }
+            else
+            {
+                logger.log( Level.WARNING, "ThrowableConstants.retryable " +
+                                           "returned out of range value, " + cat,
+                            new AssertionError( e ) );
+                return false;
+            }
+        }
+        catch( IOException e )
+        {
+            // Not likely to get better
+            logUnpackingFailure( "IOException", Level.INFO, true, e );
+            return true;
+        }
+        catch( RuntimeException e )
+        {
+            // Not likely to get better
+            logUnpackingFailure( "RuntimeException", Level.INFO, true, e );
+            return true;
+        }
+        catch( ClassNotFoundException e )
+        {
+            // codebase probably down, keep trying
+            logUnpackingFailure( "ClassNotFoundException", Levels.FAILED,
+                                 false, e );
+            mustQuery = true;
+            return false;
+        }
+
+        if( logger.isLoggable( Level.FINEST ) )
+        {
+            logger.log( Level.FINEST, "{0} tr = {1}", new Object[]{ this, tr } );
+        }
+
+        int trState;
+        try
+        {
+            trState = tr.getState();
+        }
+        catch( TransactionException e )
+        {
+            if( logger.isLoggable( Level.INFO ) )
+            {
+                logger.log( Level.INFO, "Got TransactionException when " +
+                                        "calling getState on " + tr + ", dropping transaction " +
+                                        tr.id, e );
+            }
+            trState = ABORTED;
+        }
+        catch( NoSuchObjectException e )
+        {
+            /* It would be epsilon better to to give up immediately
+            * if we get a NoSuchObjectException and we are in the
+            * active state, however, the code to do this would
+            * be very complicated since we need to hold a lock to
+            * while reading and acting on the state.
+            */
+            if( ++failCnt >= MAX_FAILURES )
+            {
+                if( logger.isLoggable( Level.INFO ) )
+                {
+                    logger.log( Level.INFO, "Got NoSuchObjectException when " +
+                                            "calling getState on " + tr + ", this was the " +
+                                            failCnt + " RemoteException, dropping transaction" +
+                                            tr.id, e );
+                }
+                trState = ABORTED;
+            }
+            else
+            {
+                if( logger.isLoggable( Levels.FAILED ) )
+                {
+                    logger.log( Levels.FAILED, "Got NoSuchObjectException " +
+                                               "when calling getState on " + tr + ", failCount = " +
+                                               failCnt + ", will retry", e );
+                }
+                mustQuery = true;      // keep on trying
+                return false;           // try again next time
+            }
+        }
+        catch( RemoteException e )
+        {
+            if( ++failCnt >= MAX_FAILURES )
+            {
+                /* abort if we are not prepared and not already
+             * aborted. If prepared retry, otherwise
+             * we're done. Check state and make any abort() call
+             * atomically so we can't accidently abort a prepared
+             * transaction.
+             */
+                synchronized( txn )
+                {
+                    switch( txn.getState() )
+                    {
+                    case ACTIVE:
+                        // Safe to abort, give up
+                        if( logger.isLoggable( Level.INFO ) )
+                        {
+                            logger.log( Level.INFO, "Got RemoteException " +
+                                                    "when calling getState on " + tr + ", this " +
+                                                    "was " + failCnt + " RemoteException, " +
+                                                    "dropping active transaction " + tr.id, e );
+                        }
+
+                        try
+                        {
+                            monitor.space().abort( tr.mgr, tr.id );
+                            return true;
+                        }
+                        catch( UnknownTransactionException ute )
+                        {
+                            throw new AssertionError( ute );
+                        }
+                        catch( UnmarshalException ume )
+                        {
+                            throw new AssertionError( ume );
+                        }
+                    case PREPARED:
+                        final Level l = ( failCnt % MAX_FAILURES == 0 ) ?
+                                        Level.INFO : Levels.FAILED;
+                        if( logger.isLoggable( l ) )
+                        {
+                            logger.log( l, "Got RemoteException when calling " +
+                                           "getState on " + tr + ", this was " +
+                                           failCnt + " RemoteException, will keep " +
+                                           "prepared transaction " + tr.id, e );
+                        }
+
+                        // Can't give up, keep on trying to find real state
+                        mustQuery = true;
+                        return false;
+                    case ABORTED:
+                    case COMMITTED:
+                        // done
+                        return true;
+                    default:
+                        throw new AssertionError( "Txn in unreachable state" );
+                    }
+                }
+            }
+            else
+            {
+                // Don't know, but not ready to give up
+                if( logger.isLoggable( Levels.FAILED ) )
+                {
+                    logger.log( Levels.FAILED, "Got RemoteException when " +
+                                               "calling getState on " + tr + ", failCount = " +
+                                               failCnt + ", will retry", e );
+                }
+
+                mustQuery = true;      // keep on trying
+                return false;           // try again next time
+            }
+        }
+
+        if( logger.isLoggable( Level.FINER ) )
+        {
+            logger.log( Level.FINER, "{0} trState = {1}",
+                        new Object[]{ this, new Integer( trState ) } );
+        }
+
+        failCnt = 0;               // reset failures -- we got a response
+
+        /*
+       * If the two states aren't the same, the state changed and we
+       * need to account for that locally here by calling the method
+       * that would make the change (the one we should have gotten.
+       * (We use the external forms of abort, commit, etc., because
+       * they are what the manager would call, and therefore these
+       * calls will always do exactly what the incoming manager
+       * calls would have done.  I don't want this to be fragile by
+       * bypassing those calls and going straight to the Txn
+       * object's calls, which might skip something important in the
+       * OutriggerServerImpl calls).
+       */
+
+        if( trState != txnState )
+        {
+            if( logger.isLoggable( Level.FINER ) )
+            {
+                logger.log( Level.FINER,
+                            "{0} mgr state[{1}] != local state [{2}]",
+                            new Object[]{ this,
+                                          TxnConstants.getName( trState ),
+                                          TxnConstants.getName( txnState ) } );
+            }
+
+            try
+            {
+                switch( trState )
+                {
+                case ABORTED:
+                    logger.log( Level.FINER, "{0} moving to abort", this );
+
+                    monitor.space().abort( tr.mgr, tr.id );
+                    return true;
+
+                case COMMITTED:
+                    logger.log( Level.FINER, "{0} moving to commit", this );
+
+                    monitor.space().commit( tr.mgr, tr.id );
+                    return true;
+                }
+            }
+            catch( UnknownTransactionException e )
+            {
+                // we must somehow have already gotten the abort() or
+                // commit(), and have therefore forgotten about the
+                // transaction, while this code was executing
+                return true;
+            }
+            catch( UnmarshalException ume )
+            {
+                throw new AssertionError( ume );
+            }
+
+            // we can't fake anything else -- the manager will have to call
+            // us
+        }
+
+        logger.log( Level.FINEST, "{0} return false", this );
+
+        return false;            // now we know so nothing more to do
+    }
+
+    /**
+     * Add in a resource.  The lease may already be in, in which case it is
+     * ignored, or it may be null, in which case it was a non-leased probe
+     * that was blocked and we simply set <code>mustQuery</code> to
+     * <code>true</code>.
+     */
+    synchronized void add( QueryWatcher query )
+    {
+        if( query == null || query.getExpiration() <= nextQuery )
+        {
+            if( logger.isLoggable( Level.FINEST ) )
+            {
+                logger.log( Level.FINEST, "adding resource to task -- SHORT" );
+            }
+            mustQuery = true;
+        }
+        else
+        {
+            if( logger.isLoggable( Level.FINEST ) )
+            {
+                logger.log( Level.FINEST, "adding resource to task -- LONG" );
+            }
+            if( queries == null )
+            {
+                queries = new WeakHashMap();// we use it like a WeakHashSet
+            }
+            queries.put( query, null );
+        }
+    }
+
+    /**
+     * Log failed unpacking attempt attempt
+     */
+    private void logUnpackingFailure( String exceptionDescription, Level level,
+                                      boolean terminal, Throwable t )
+    {
+        if( logger.isLoggable( level ) )
+        {
+            logger.log( level, "Encountered " + exceptionDescription +
+                               "while unpacking exception to check state, " +
+                               ( terminal ? "dropping" : "keeping" ) + " monitoring task", t );
+        }
+    }
+
+}

Added: incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnState.java
URL: http://svn.apache.org/viewvc/incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnState.java?rev=724979&view=auto
==============================================================================
--- incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnState.java (added)
+++ incubator/river/jtsk/skunk/niclas1/services/outrigger/src/main/java/com/sun/jini/outrigger/TxnState.java Tue Dec  9 21:13:53 2008
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.sun.jini.outrigger;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import net.jini.core.transaction.server.TransactionConstants;
+import net.jini.space.InternalSpaceException;
+
+/**
+ * Class that manages transaction related state on behalf of
+ * <code>EntryHandle</code>s. Can accommodate entries locked by
+ * more than one transaction.  The synchronization of this object is
+ * managed by the <code>EntryHandle</code> that owns it.
+ *
+ * @author Sun Microsystems, Inc.
+ * @see EntryHandle
+ */
+class TxnState
+{
+    /**
+     * The list of known managers.  In order to keep things small in the
+     * common case that there is only one known manager, <code>mgrs</code>
+     * is managed as a ``list'' in one of two states -- it is either a
+     * direct reference to the only manager for this handle, or a reference
+     * to an <code>HashSet</code> with entries for each associated manager.
+     */
+    private Object mgrs;
+
+    /**
+     * The current state of the handle, such as <code>READ</code> or
+     * <code>TAKE</code>.
+     */
+    private int state;
+
+    /**
+     * The holder the handle which owns this object is in
+     */
+    final private EntryHolder holder;
+
+    /**
+     * Logger for logging information about entry matching
+     */
+    private static final Logger matchingLogger =
+        Logger.getLogger( OutriggerServerImpl.matchingLoggerName );
+
+    /**
+     * Logger for logging transaction related information
+     */
+    private static final Logger txnLogger =
+        Logger.getLogger( OutriggerServerImpl.txnLoggerName );
+
+    /**
+     * Create a new <code>TxnState</code>.  It will start initially
+     * with the type of lock indicated by <code>state</code> under the
+     * transaction managed by <code>mgr</code>.  <code>holder</code> is
+     * the holder the associated entry handle is in.
+     */
+    TxnState( TransactableMgr mgr, int state, EntryHolder holder )
+    {
+        this.mgrs = mgr;
+        this.state = state;
+        this.holder = holder;
+        txnLogger.log( Level.FINER, "TxnState: TxnState: state = {0}",
+                       TransactableMgr.stateNames[ state ] );
+    }
+
+    /**
+     * Prepare to commit this object's part of the transaction.  Return
+     * the prepare's status.
+     */
+    int prepare( TransactableMgr mgr, OutriggerServerImpl space,
+                 EntryHandle owner )
+    {
+        txnLogger.log( Level.FINEST, "TxnState: prepare: state = {0}",
+                       TransactableMgr.stateNames[ state ] );
+
+        if( state == TransactableMgr.READ )
+        {
+            final int locksLeft = removeMgr( mgr );
+            /* If there is more than one lock left resolving this lock
+            * does not change anything (takes still can't take the
+            * entry and reads could already) Also need to make sure
+            * it has not be removed (say by expiration or a cancel)
+            * since this would cause to send events for entries that
+            * were removed before it became visible. Also could cause
+            * ifExists queries to hang on to the entry and block
+            * indefinitely.  If they had already send the EntryTransition
+            * for the removal.
+            */
+            if( locksLeft <= 1 && !owner.removed() )
+            {
+                // Has the potential to resolve conflicts/generate events
+                if( locksLeft == 1 )
+                {
+                    space.recordTransition(
+                        new EntryTransition( owner, mgr(), true, false, false ) );
+                }
+                else if( locksLeft == 0 )
+                {
+                    space.recordTransition(
+                        new EntryTransition( owner, null, true, false, false ) );
+                }
+                else
+                {
+                    throw new AssertionError( "Fewer than 0 locks left" );
+                }
+            }
+            return TransactionConstants.NOTCHANGED;
+        }
+
+        return TransactionConstants.PREPARED;
+    }
+
+    /**
+     * Abort this object's part of the transaction.  Return true
+     * if this clears the last transaction associated with this object.
+     */
+    boolean abort( TransactableMgr mgr, OutriggerServerImpl space,
+                   EntryHandle owner )
+    {
+        boolean rslt = true;
+
+        txnLogger.log( Level.FINEST, "TxnState: abort: state = {0}",
+                       TransactableMgr.stateNames[ state ] );
+
+        if( state == TransactableMgr.READ || state == TransactableMgr.TAKE )
+        {
+            final int locksLeft = removeMgr( mgr );
+            rslt = ( locksLeft == 0 );
+            /* If there is more than one lock left resolving this
+            * lock does not change anything (takes still can't
+            * take the entry and reads could already). Also
+            * need to make sure it has not be removed (say
+            * by expiration or a cancel), otherwise
+            * we could log a transition to visible after
+            * the removal was logged, this could cause
+            * ifExists queries to hang on to the entry
+            * and block indefinitely. (This raises an
+            * issue if remote events were sent for re-appearance)
+            */
+            if( locksLeft <= 1 && !owner.removed() )
+            {
+                // Has the potential to resolve conflicts/generate events
+                if( locksLeft == 1 )
+                {
+                    /* There could have only been multiple locks if
+                  * they were all read locks, thus the lock the
+                  * abort resolves must h be a read lock, and the
+                  * remaining lock must be a read lock too.
+                  * Therefore this must be an availability but not a
+                  * visibility transition.
+                  */
+                    assert state == TransactableMgr.READ;
+
+                    space.recordTransition(
+                        new EntryTransition( owner, mgr(), true, false, false ) );
+                }
+                else if( locksLeft == 0 )
+                {
+                    // Only a visibility transition if the lock being
+                    // dropped was a take lock
+                    final boolean visibility = ( state == TransactableMgr.TAKE );
+
+                    space.recordTransition(
+                        new EntryTransition( owner, null, true, visibility,
+                                             false ) );
+                }
+                else
+                {
+                    throw new AssertionError( "Fewer than 0 locks left" );
+                }
+            }
+        }
+        else
+        {
+            /* must be a write, make the entry disappear, will
+            * call recordTransition()
+            */
+            holder.remove( owner, false );
+        }
+
+        return rslt;
+    }
+
+    /**
+     * Commit this object's part of the transaction.  The
+     * <code>space</code> is the <code>OutriggerServerImpl</code> on
+     * which the operation happens -- some commit operations have
+     * space-wide side effects (for example, a commit of a
+     * <code>write</code> operation can cause event notifications for
+     * clients registered under the transaction's parent). Return true
+     * if this clears the last transaction associated with this
+     * object.
+     */
+    boolean commit( TransactableMgr mgr, OutriggerServerImpl space,
+                    EntryHandle owner )
+    {
+        txnLogger.log( Level.FINEST, "TxnState: commit: state = {0}",
+                       TransactableMgr.stateNames[ state ] );
+
+        switch( state )
+        {
+        case TransactableMgr.WRITE:
+            //!! assumption -- single level transactions:
+            //!! we pass null where we should pass the parent txn
+            if( owner.removed() )
+            // if removed() then this entry must
+            // have been taken as well as written under this
+            // transaction.
+            {
+                return true;
+            }
+
+            space.recordTransition(
+                new EntryTransition( owner, null, true, true, true ) );
+            return ( removeMgr( mgr ) == 0 );
+
+        case TransactableMgr.READ:
+            // Read locked entries should never get prepared.
+            throw new InternalSpaceException
+                ( "committing a read locked entry" );
+
+        case TransactableMgr.TAKE:
+            // remove calls recordTransition()
+            holder.remove( owner, false );
+            // Resolves a lock
+            return true; // Take-locked entries only have one Txn
+
+        default:
+            throw new InternalSpaceException( "unexpected state in "
+                                              + "TxnState.commit(): " + state );
+        }
+    }
+
+    /**
+     * Remove the given mgr from the list of known managers.  Return the
+     * number of mgrs still associated with this entry.
+     */
+    private int removeMgr( TransactableMgr mgr )
+    {
+        if( mgrs == null )
+        {
+            return 0;
+        }
+
+        if( mgr == mgrs )
+        {
+            mgrs = null;
+            return 0;
+        }
+
+        final HashSet tab = (HashSet) mgrs;
+        tab.remove( mgr );
+        return tab.size();
+    }
+
+
+    /**
+     * Add <code>mgr</code> to the list of known managers, setting the
+     * state of this handle to <code>op</code>.
+     */
+    void add( TransactableMgr mgr, int op )
+    {
+        if( mgr == mgrs )
+        {
+            return;    // already the only one known
+        }
+
+        if( mgrs instanceof TransactableMgr )
+        {
+            Object origMgr = mgrs;
+            mgrs = new HashSet( 7 );
+            ( (HashSet) mgrs ).add( origMgr );
+        }
+
+        // if mgr is in already this is harmless, and checking to prevent
+        // a redundant addition is more expensive
+        ( (HashSet) mgrs ).add( mgr );
+        monitor( mgr );
+        state = op;
+    }
+
+    /**
+     * If we need to, add this manager to the list of transactions that
+     * need to be monitored because of conflicts over this entry.  Any
+     * existing blocking txn is sufficient.
+     *
+     * @see TxnMonitorTask#addSibling
+     */
+    private void monitor( TransactableMgr mgr )
+    {
+        Txn txn = (Txn) mgr;
+        Iterator it = ( (HashSet) mgrs ).iterator();
+        while( it.hasNext() )
+        {
+            Txn otherTxn = (Txn) it.next();
+            if( otherTxn != mgr && otherTxn.monitorTask() != null )
+            {
+                otherTxn.monitorTask().addSibling( txn );
+                return;
+            }
+        }
+    }
+
+    /**
+     * It this entry is read locked promote to take locked and return
+     * true, otherwise return false.  Assumes the take is being
+     * performed under the one transaction that owns a lock on the
+     * entry.
+     */
+    boolean promoteToTakeIfNeeded()
+    {
+        // We can assume our state is ether WRITE or READ, if it was
+        // take this method would not be called (since take blocks
+        // other takes in the same transaction canPerform() would have
+        // returned false)
+
+        if( state == TransactableMgr.WRITE )
+        {
+            return false;
+        }
+
+        state = TransactableMgr.TAKE;
+        return true;
+    }
+
+    /**
+     * Returns <code>true</code> if the operation <code>op</code>
+     * under the transaction manager by <code>mgr</code> is legal on
+     * the associated entry given the operations already performed on
+     * the entry under other transactions.  It is legal to:
+     * <ul>
+     * <li>Read an entry that has been read in any other transaction,
+     * or that has been written under the same transaction (that is,
+     * if <code>mgr</code> is the same as the writing transaction).
+     * <li>Take an entry that was written under the same transaction, or
+     * which was read <em>only</em> under the same transaction (that is,
+     * no other active transactions have also read it).
+     * </ul>
+     * All other operations are not legal, so <code>canPerform</code>
+     * otherwise returns <code>false</code>.
+     */
+    boolean canPerform( TransactableMgr mgr, int op )
+    {
+        //Note: If two transactions are performed
+        //	within the same TransactableMgr
+        //	(see Txn.java), that means its in
+        //	the same Transaction (see Transaction.java).
+        //      Operations under the same Transaction scope
+        //	are the same as if under a null Transaction
+        //	i.e. all operations are visible.
+        //
+        //	Semantics need to be added for multi-level
+        //	transactions.
+
+        if( matchingLogger.isLoggable( Level.FINER ) )
+        {
+            matchingLogger.log( Level.FINER,
+                                "TxnState: canPerform({0}, {1}): state = {2}",
+                                new Object[]{ mgr, new Integer( op ),
+                                              TransactableMgr.stateNames[ state ] } );
+        }
+
+        switch( state )
+        {
+        case TransactableMgr.READ:
+            if( op == TransactableMgr.READ )
+            {
+                return true;
+            }
+            return onlyMgr( mgr ); // can only modify if I'm the only participant
+        case TransactableMgr.WRITE:
+            if( ( op == TransactableMgr.READ ) ||
+                ( op == TransactableMgr.TAKE ) )
+            {
+                return onlyMgr( mgr );
+            }
+            return false;      // can't read unless I'm the writer
+        case TransactableMgr.TAKE:
+            return false;      // can't take anything taken any time
+        }
+        return false;
+    }
+
+    /**
+     * Return <code>true</code> if <code>mgr</code> is one of the managers
+     * known to be managing this entry.
+     */
+    boolean knownMgr( TransactableMgr mgr )
+    {
+        if( mgr == null )
+        {
+            return false;
+        }
+        if( mgr == mgrs )
+        {
+            return true;
+        }
+        if( mgrs instanceof HashSet )
+        {
+            return ( (HashSet) mgrs ).contains( mgr );
+        }
+        return false;
+    }
+
+    /**
+     * Return <code>true</code> if the given manager is the only one
+     * we know about.
+     */
+    boolean onlyMgr( TransactableMgr mgr )
+    {
+        if( mgr == null )
+        {
+            return false;
+        }
+        if( mgr == mgrs )
+        {
+            return true;
+        }
+        if( mgrs instanceof HashSet )
+        {
+            HashSet tab = (HashSet) mgrs;
+            return ( tab.size() == 1 && tab.contains( mgr ) );
+        }
+        return false;
+    }
+
+    /**
+     * Add all the managers of this transaction to the given
+     * collection.
+     */
+    void addTxns( java.util.Collection collection )
+    {
+        if( mgrs == null )
+        {
+            return;
+        }
+
+        if( mgrs instanceof HashSet )
+        {
+            final HashSet tab = (HashSet) mgrs;
+            collection.addAll( (HashSet) mgrs );
+            return;
+        }
+
+        collection.add( mgrs );
+    }
+
+    /**
+     * Return true if there are no more transactions associated with
+     * this object.
+     */
+    boolean empty()
+    {
+        if( mgrs == null )
+        {
+            return true;
+        }
+
+        if( mgrs instanceof HashSet )
+        {
+            return ( (HashSet) mgrs ).isEmpty();
+        }
+
+        return false;
+    }
+
+    /**
+     * Used by mgr()
+     */
+    final private TransactableMgr[] rslt = new TransactableMgr[1];
+
+    /**
+     * Returns the one manager associated with this transaction.
+     * Throws an AssertionError if there is more or fewer than one
+     * manager associated with this transaction.
+     */
+    private TransactableMgr mgr()
+    {
+        if( mgrs == null )
+        {
+            throw new AssertionError
+                ( "mgr() called on a TxnState with no manager" );
+        }
+
+        if( mgrs instanceof HashSet )
+        {
+            final HashSet tab = (HashSet) mgrs;
+            if( tab.size() != 1 )
+            {
+                throw new AssertionError
+                    ( "mgr() called on TxnState with more than one manager" );
+            }
+            tab.toArray( rslt );
+            return rslt[ 0 ];
+        }
+
+        return (TransactableMgr) mgrs;
+    }
+}



Mime
View raw message