geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [46/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject
Date Fri, 21 Aug 2015 21:23:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedLockManager.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedLockManager.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedLockManager.java
deleted file mode 100644
index 5519e00..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedLockManager.java
+++ /dev/null
@@ -1,742 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.ChannelException;
-import com.gemstone.org.jgroups.blocks.VotingAdapter.FailureVoteResult;
-import com.gemstone.org.jgroups.blocks.VotingAdapter.VoteResult;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Rsp;
-import com.gemstone.org.jgroups.util.RspList;
-
-import java.io.Serializable;
-import java.util.HashMap;
-
-/**
- * Distributed lock manager is responsible for maintaining the lock information
- * consistent on all participating nodes.
- * 
- * @author Roman Rokytskyy (rrokytskyy@acm.org)
- * @author Robert Schaffar-Taurok (robert@fusion.at)
- * @version $Id: DistributedLockManager.java,v 1.6 2005/06/08 15:56:54 publicnmi Exp $
- */
-public class DistributedLockManager implements TwoPhaseVotingListener, LockManager, VoteResponseProcessor {
-    /**
-     * Definitions for the implementation of the VoteResponseProcessor
-     */
-    private static final int PROCESS_CONTINUE = 0;
-    private static final int PROCESS_SKIP = 1;
-    private static final int PROCESS_BREAK = 2;
-
-    /**
-     * This parameter means that lock acquisition expires after 5 seconds.
-     * If there were no "commit" operation on prepared lock, then it
-     * is treated as expired and is removed from the prepared locks table.
-     */
-    private static final long ACQUIRE_EXPIRATION = 5000;
-    
-    /**
-     * This parameter is used during lock releasing. If group fails to release
-     * the lock during the specified period of time, unlocking fails.
-     */
-    private static final long VOTE_TIMEOUT = 10000;
-
-	// list of all prepared locks
-	private final HashMap preparedLocks = new HashMap();
-
-	// list of all prepared releases
-	private final HashMap preparedReleases = new HashMap();
-
-	// list of locks on the node
-	private final HashMap heldLocks = new HashMap();
-
-	private final TwoPhaseVotingAdapter votingAdapter;
-
-	private final Object id;
-
-    protected final GemFireTracer log=GemFireTracer.getLog(getClass());
-
-    // @todo check if the node with the same id is already in the group.
-    /**
-     * Create instance of this class.
-     * 
-     * @param voteChannel instance of {@link VotingAdapter} that will be used 
-     * for voting purposes on the lock decrees. <tt>voteChannel()</tt> will
-     * be wrapped by the instance of the {@link TwoPhaseVotingAdapter}.
-     * 
-     * @param id the unique identifier of this lock manager.
-     */
-    public DistributedLockManager(VotingAdapter voteChannel, Object id) {
-        this(new TwoPhaseVotingAdapter(voteChannel), id);
-    }
-
-    // @todo check if the node with the same id is already in the group.
-    /**
-     *  Constructor for the DistributedLockManager_cl object.
-     * 
-     *  @param channel instance of {@link TwoPhaseVotingAdapter}
-     *  that will be used for voting purposes on the lock decrees.
-     * 
-     *  @param id the unique identifier of this lock manager.
-     */
-    public DistributedLockManager(TwoPhaseVotingAdapter channel, Object id) {
-        this.id = id;
-        this.votingAdapter = channel;
-        this.votingAdapter.addListener(this);
-    }
-
-    /**
-     * Performs local lock. This method also performs the clean-up of the lock
-     * table, all expired locks are removed.
-     */
-    private boolean localLock(LockDecree lockDecree) {
-        // remove expired locks
-        removeExpired(lockDecree);
-
-        LockDecree localLock =
-            (LockDecree) heldLocks.get(lockDecree.getKey());
-
-        if (localLock == null) {
-
-            // promote lock into commited state
-            lockDecree.commit();
-
-            // no lock exist, perform local lock, note:
-            // we do not store locks that were requested by other manager.
-            if (lockDecree.managerId.equals(id))
-                heldLocks.put(lockDecree.getKey(), lockDecree);
-
-            // everything is fine :)
-            return true;
-        } else
-        if (localLock.requester.equals(lockDecree.requester))
-            // requester already owns the lock
-            return true;
-        else
-            // lock does not belong to requester
-            return false;
-
-    }
-
-    /**
-     * Returns <code>true</code> if the requested lock can be granted by the
-     * current node.
-     * 
-     * @param decree instance of <code>LockDecree</code> containing information
-     * about the lock.
-     */
-    private boolean canLock(LockDecree decree) {
-        // clean expired locks
-        removeExpired(decree);
-
-        LockDecree lock = (LockDecree)heldLocks.get(decree.getKey());
-        if (lock == null)
-            return true;
-        else
-            return lock.requester.equals(decree.requester);
-    }
-
-    /**
-     * Returns <code>true</code> if the requested lock can be released by the
-     * current node.
-     * 
-     * @param decree instance of {@link LockDecree} containing information
-     * about the lock.
-     */
-    private boolean canRelease(LockDecree decree) {
-        // clean expired locks
-        removeExpired(decree);
-
-        // we need to check only hold locks, because
-        // prepared locks cannot contain the lock
-        LockDecree lock = (LockDecree)heldLocks.get(decree.getKey());
-        if (lock == null)
-            // check if this holds...
-            return true;
-        else
-            return lock.requester.equals(decree.requester);
-    }
-
-    /**
-     * Removes expired locks.
-     * 
-     * @param decree instance of {@link LockDecree} describing the lock.
-     */
-    private void removeExpired(LockDecree decree) {
-        // remove the invalid (expired) lock
-        LockDecree localLock = (LockDecree)heldLocks.get(decree.getKey());
-        if (localLock != null && !localLock.isValid())
-            heldLocks.remove(localLock.getKey());
-    }
-
-    /**
-     * Releases lock locally.
-     * 
-     * @param lockDecree instance of {@link LockDecree} describing the lock.
-     */
-    private boolean localRelease(LockDecree lockDecree) {
-        // remove expired locks
-        removeExpired(lockDecree);
-
-        LockDecree localLock=
-                (LockDecree) heldLocks.get(lockDecree.getKey());
-
-        if(localLock == null) {
-            // no lock exist
-            return true;
-        }
-        else if(localLock.requester.equals(lockDecree.requester)) {
-            // requester owns the lock, release the lock
-            heldLocks.remove(lockDecree.getKey());
-            return true;
-        }
-        else
-        // lock does not belong to requester
-            return false;
-    }
-
-    /**
-     * Locks an object with <code>lockId</code> on behalf of the specified
-     * <code>owner</code>.
-     * 
-     * @param lockId <code>Object</code> representing the object to be locked.
-     * @param owner object that requests the lock.
-     * @param timeout time during which group members should decide
-     * whether to grant a lock or not.
-     *
-     * @throws LockNotGrantedException when the lock cannot be granted.
-     * 
-     * @throws ClassCastException if lockId or owner are not serializable.
-     * 
-     * @throws ChannelException if something bad happened to underlying channel.
-     */
-    public void lock(Object lockId, Object owner, int timeout)
-        throws LockNotGrantedException, ChannelException
-    {
-        if (!(lockId instanceof Serializable) || !(owner instanceof Serializable))
-            throw new ClassCastException("DistributedLockManager " +
-                "works only with serializable objects.");
-
-        boolean acquired = votingAdapter.vote(
-            new AcquireLockDecree(lockId, owner, id), timeout);
-
-        if (!acquired)
-            throw new LockNotGrantedException("Lock cannot be granted.");
-    }
-
-    /**
-     * Unlocks an object with <code>lockId</code> on behalf of the specified
-     * <code>owner</code>.
-     * 
-     * since 2.2.9 this method is only a wrapper for 
-     * unlock(Object lockId, Object owner, boolean releaseMultiLocked).
-     * Use that with releaseMultiLocked set to true if you want to be able to
-     * release multiple locked locks (for example after a merge)
-     * 
-     * @param lockId <code>long</code> representing the object to be unlocked.
-     * @param owner object that releases the lock.
-     *
-     * @throws LockNotReleasedException when the lock cannot be released.
-     * @throws ClassCastException if lockId or owner are not serializable.
-     * 
-     */
-    public void unlock(Object lockId, Object owner)
-        throws LockNotReleasedException, ChannelException
-    {
-        try {
-            unlock(lockId, owner, false);
-        } catch (LockMultiLockedException e) {
-            // This should never happen when releaseMultiLocked is false
-            log.error(ExternalStrings.DistributedLockManager_CAUGHT_MULTILOCKEDEXCEPTION_BUT_RELEASEMULTILOCKED_IS_FALSE, e);
-        }
-    }
-
-    /**
-     * Unlocks an object with <code>lockId</code> on behalf of the specified
-     * <code>owner</code>.
-     * @param lockId <code>long</code> representing the object to be unlocked.
-     * @param owner object that releases the lock.
-     * @param releaseMultiLocked releases also multiple locked locks. (eg. locks that are locked by another DLM after a merge)
-     *
-     * @throws LockNotReleasedException when the lock cannot be released.
-     * @throws ClassCastException if lockId or owner are not serializable.
-     * @throws LockMultiLockedException if releaseMultiLocked is true and a multiple locked lock has been released.
-     */
-    public void unlock(Object lockId, Object owner, boolean releaseMultiLocked)
-        throws LockNotReleasedException, ChannelException, LockMultiLockedException
-    {
-
-        if (!(lockId instanceof Serializable) || !(owner instanceof Serializable))
-            throw new ClassCastException("DistributedLockManager " +
-                "works only with serializable objects.");
-
-        ReleaseLockDecree releaseLockDecree = new ReleaseLockDecree(lockId, owner, id);
-        boolean released = false;
-        if (releaseMultiLocked) {
-            released = votingAdapter.vote(releaseLockDecree, VOTE_TIMEOUT, this);
-            if (releaseLockDecree.isMultipleLocked()) {
-                throw new LockMultiLockedException("Lock was also locked by other DistributedLockManager(s)");
-            }
-        } else {
-            released = votingAdapter.vote(releaseLockDecree, VOTE_TIMEOUT);
-        }
-        
-        if (!released)
-            throw new LockNotReleasedException("Lock cannot be unlocked.");
-    }
-
-    /**
-     * Checks the list of prepared locks/unlocks to determine if we are in the
-     * middle of the two-phase commit process for the lock acqusition/release.
-     * Here we do not tolerate if the request comes from the same node on behalf
-     * of the same owner.
-     * 
-     * @param preparedContainer either <code>preparedLocks</code> or
-     * <code>preparedReleases</code> depending on the situation.
-     * 
-     * @param requestedDecree instance of <code>LockDecree</code> representing
-     * the lock.
-     */
-    private boolean checkPrepared(HashMap preparedContainer, 
-        LockDecree requestedDecree)
-    {
-        LockDecree preparedDecree =
-            (LockDecree)preparedContainer.get(requestedDecree.getKey());
-
-        // if prepared lock is not valid, remove it from the list
-        if ((preparedDecree != null) && !preparedDecree.isValid()) {
-            preparedContainer.remove(preparedDecree.getKey());
-
-            preparedDecree = null;
-        }
-
-        if (preparedDecree != null) {
-            if (requestedDecree.requester.equals(preparedDecree.requester))
-                return true;
-            else
-                return false;
-        } else
-            // it was not prepared... sorry...
-            return true;
-    }
-
-    /**
-     * Prepare phase for the lock acquisition or release.
-     * 
-     * @param decree should be an instance <code>LockDecree</code>, if not,
-     * we throw <code>VoteException</code> to be ignored by the
-     * <code>VoteChannel</code>.
-     * 
-     * @return <code>true</code> when preparing the lock operation succeeds.
-     * 
-     * @throws VoteException if we should be ignored during voting.
-     */
-    public synchronized boolean prepare(Object decree) throws VoteException {
-        if (!(decree instanceof LockDecree))
-            throw new VoteException("Uknown decree type. Ignore me.");
-            
-        if (decree instanceof AcquireLockDecree) {
-            AcquireLockDecree acquireDecree = (AcquireLockDecree)decree;
-            if(log.isDebugEnabled()) log.debug("Preparing to acquire decree " + acquireDecree.lockId);
-            
-            if (!checkPrepared(preparedLocks, acquireDecree))
-                // there is a prepared lock owned by third party
-                return false;
-
-            if (canLock(acquireDecree)) {
-                preparedLocks.put(acquireDecree.getKey(), acquireDecree);
-                return true;
-            } else
-                // we are unable to aquire local lock
-                return false;
-        } else
-        if (decree instanceof ReleaseLockDecree) {
-            ReleaseLockDecree releaseDecree = (ReleaseLockDecree)decree;
-            
-
-                if(log.isDebugEnabled()) log.debug("Preparing to release decree " + releaseDecree.lockId);
-
-            if (!checkPrepared(preparedReleases, releaseDecree))
-                // there is a prepared release owned by third party
-                return false;
-
-            if (canRelease(releaseDecree)) {
-                preparedReleases.put(releaseDecree.getKey(), releaseDecree);
-                // we have local lock and the prepared lock
-                return true;
-            } else
-                // we were unable to aquire local lock
-                return false;
-        } else
-        if (decree instanceof MultiLockDecree) {
-            // Here we abuse the voting mechanism for notifying the other lockManagers of multiple locked objects.
-            MultiLockDecree multiLockDecree = (MultiLockDecree)decree;
-            
-            if(log.isDebugEnabled()) {
-                log.debug("Marking " + multiLockDecree.getKey() + " as multilocked");
-            }
-
-            LockDecree lockDecree = (LockDecree)heldLocks.get(multiLockDecree.getKey());
-            if (lockDecree != null) {
-                lockDecree.setMultipleLocked(true);
-            }
-            return true;
-        }
-
-        // we should not be here
-        return false;
-    }
-
-    /**
-     * Commit phase for the lock acquisition or release.
-     * 
-     * @param decree should be an instance <code>LockDecree</code>, if not,
-     * we throw <code>VoteException</code> to be ignored by the
-     * <code>VoteChannel</code>.
-     * 
-     * @return <code>true</code> when commiting the lock operation succeeds.
-     * 
-     * @throws VoteException if we should be ignored during voting.
-     */
-    public synchronized boolean commit(Object decree) throws VoteException {
-        if (!(decree instanceof LockDecree))
-            throw new VoteException("Uknown decree type. Ignore me.");
-
-        if (decree instanceof AcquireLockDecree) {
-            
-
-                if(log.isDebugEnabled()) log.debug("Committing decree acquisition " + ((LockDecree)decree).lockId);
-            
-            if (!checkPrepared(preparedLocks, (LockDecree)decree))
-                // there is a prepared lock owned by third party
-                return false;
-
-            if (localLock((LockDecree)decree)) {
-                preparedLocks.remove(((LockDecree)decree).getKey());
-                return true;
-            } else
-                return false;
-        } else
-        if (decree instanceof ReleaseLockDecree) {
-            
-
-                if(log.isDebugEnabled()) log.debug("Committing decree release " + ((LockDecree)decree).lockId);
-            
-            if (!checkPrepared(preparedReleases, (LockDecree)decree))
-                // there is a prepared release owned by third party
-                return false;
-
-            if (localRelease((LockDecree)decree)) {
-                preparedReleases.remove(((LockDecree)decree).getKey());
-                return true;
-            } else
-                return false;
-        } else
-        if (decree instanceof MultiLockDecree) {
-            return true;
-        }
-
-        // we should not be here
-        return false;
-    }
-
-    /**
-     * Abort phase for the lock acquisition or release.
-     * 
-     * @param decree should be an instance <code>LockDecree</code>, if not,
-     * we throw <code>VoteException</code> to be ignored by the
-     * <code>VoteChannel</code>.
-     * 
-     * @throws VoteException if we should be ignored during voting.
-     */
-    public synchronized void abort(Object decree) throws VoteException {
-        if (!(decree instanceof LockDecree))
-            throw new VoteException("Uknown decree type. Ignore me.");
-
-        if (decree instanceof AcquireLockDecree) {
-            
-
-                if(log.isDebugEnabled()) log.debug("Aborting decree acquisition " + ((LockDecree)decree).lockId);
-            
-            if (!checkPrepared(preparedLocks, (LockDecree)decree))
-                // there is a prepared lock owned by third party
-                return;
-
-            preparedLocks.remove(((LockDecree)decree).getKey());
-        } else
-        if (decree instanceof ReleaseLockDecree) {
-            
-
-                if(log.isDebugEnabled()) log.debug("Aborting decree release " + ((LockDecree)decree).lockId);
-            
-            if (!checkPrepared(preparedReleases, (LockDecree)decree))
-                // there is a prepared release owned by third party
-                return;
-
-            preparedReleases.remove(((LockDecree)decree).getKey());
-        }
-
-    }
-
-    
-    /**
-     * Processes the response list and votes like the default processResponses method with the consensusType VOTE_ALL
-     * If the result of the voting is false, but this DistributedLockManager owns the lock, the result is changed to
-     * true and the lock is released, but marked as multiple locked. (only in the prepare state to reduce traffic)
-     * <p>
-     * Note: we do not support voting in case of Byzantine failures, i.e.
-     * when the node responds with the fault message.
-     */
-    public boolean processResponses(RspList responses, int consensusType, Object decree) throws ChannelException {
-        if (responses == null) {
-            return false;
-        }
-
-        int totalPositiveVotes = 0;
-        int totalNegativeVotes = 0;
-
-        for (int i = 0; i < responses.size(); i++) {
-            Rsp response = (Rsp) responses.elementAt(i);
-
-            switch (checkResponse(response)) {
-                case PROCESS_SKIP:
-                    continue;
-                case PROCESS_BREAK:
-                    return false;
-            }
-
-            VoteResult result = (VoteResult) response.getValue();
-
-            totalPositiveVotes += result.getPositiveVotes();
-            totalNegativeVotes += result.getNegativeVotes();
-        }
-
-        boolean voteResult = (totalNegativeVotes == 0 && totalPositiveVotes > 0);
-
-        if (decree instanceof TwoPhaseVotingAdapter.TwoPhaseWrapper) {
-            TwoPhaseVotingAdapter.TwoPhaseWrapper wrappedDecree = (TwoPhaseVotingAdapter.TwoPhaseWrapper)decree;
-            if (wrappedDecree.isPrepare()) {
-	            Object unwrappedDecree = wrappedDecree.getDecree();
-	            if (unwrappedDecree instanceof ReleaseLockDecree) {
-	                ReleaseLockDecree releaseLockDecree = (ReleaseLockDecree)unwrappedDecree;
-	                LockDecree lock = null;
-	                if ((lock = (LockDecree)heldLocks.get(releaseLockDecree.getKey())) != null) {
-	                    // If there is a local lock...
-	                    if (!voteResult) {
-	                        // ... and another DLM voted negatively, but this DLM owns the lock
-	                        // we inform the other node, that it's lock is multiple locked
-	                        if (informLockingNodes(releaseLockDecree)) {
-	                        
-		                        // we set the local lock to multiple locked
-		                        lock.setMultipleLocked(true);
-		                        
-		                        voteResult = true;
-	                        }
-	                    }
-	                    if (lock.isMultipleLocked()) {
-	                        //... and the local lock is marked as multilocked
-	                        // we mark the releaseLockDecree als multiple locked for evaluation when unlock returns
-	                        releaseLockDecree.setMultipleLocked(true);
-	                    }
-	                }
-	            }
-            }
-        }
-
-        return voteResult;
-    }
-
-    /**
-     * This method checks the response and says the processResponses() method
-     * what to do.
-     * @return PROCESS_CONTINUE to continue calculating votes,
-     * PROCESS_BREAK to stop calculating votes from the nodes,
-     * PROCESS_SKIP to skip current response.
-     * @throws ChannelException when the response is fatal to the
-     * current voting process.
-     */
-    private int checkResponse(Rsp response) throws ChannelException {
-
-        if (!response.wasReceived()) {
-
-            if (log.isDebugEnabled())
-                log.debug("Response from node " + response.getSender() + " was not received.");
-
-            throw new ChannelException("Node " + response.getSender() + " failed to respond.");
-        }
-
-        if (response.wasSuspected()) {
-
-            if (log.isDebugEnabled())
-                log.debug("Node " + response.getSender() + " was suspected.");
-
-            return PROCESS_SKIP;
-        }
-
-        Object object = response.getValue();
-
-        // we received exception/error, something went wrong
-        // on one of the nodes... and we do not handle such faults
-        if (object instanceof Throwable) {
-            throw new ChannelException("Node " + response.getSender() + " is faulty.");
-        }
-
-        if (object == null) {
-            return PROCESS_SKIP;
-        }
-
-        // it is always interesting to know the class that caused failure...
-        if (!(object instanceof VoteResult)) {
-            String faultClass = object.getClass().getName();
-
-            // ...but we do not handle byzantine faults
-            throw new ChannelException("Node " + response.getSender() + " generated fault (class " + faultClass + ')');
-        }
-
-        // what if we received the response from faulty node?
-        if (object instanceof FailureVoteResult) {
-
-            if (log.isErrorEnabled())
-                log.error(ExternalStrings.DistributedLockManager_0, ((FailureVoteResult) object).getReason());
-
-            return PROCESS_BREAK;
-        }
-
-        // everything is fine :)
-        return PROCESS_CONTINUE;
-    }
-    
-    private boolean informLockingNodes(ReleaseLockDecree releaseLockDecree) throws ChannelException {
-        return votingAdapter.vote(new MultiLockDecree(releaseLockDecree), VOTE_TIMEOUT);
-    }
-    
-    /**
-     * This class represents the lock
-     */
-    public static class LockDecree implements Serializable {
-        private static final long serialVersionUID = -7422058144576778340L;
-
-        protected final Object lockId;
-        protected final Object requester;
-        protected final Object managerId;
-
-        protected boolean commited;
-        
-        private boolean multipleLocked = false;
-
-        protected LockDecree(Object lockId, Object requester, Object managerId) {
-            this.lockId = lockId;
-            this.requester = requester;
-            this.managerId = managerId;
-        }
-
-        /**
-         * Returns the key that should be used for Map lookup.
-         */
-        public Object getKey() { return lockId; }
-
-        /**
-         * This is a place-holder for future lock expiration code.
-         */
-        public boolean isValid() { return true; }
-
-        public void commit() { this.commited = true; }
-
-        /**
-         * @return Returns the multipleLocked.
-         */
-        public boolean isMultipleLocked() {
-            return multipleLocked;
-        }
-        /**
-         * @param multipleLocked The multipleLocked to set.
-         */
-        public void setMultipleLocked(boolean multipleLocked) {
-            this.multipleLocked = multipleLocked;
-        }
-        /**
-         * This is hashcode from the java.lang.Long class.
-         */
-        @Override // GemStoneAddition
-        public int hashCode() {
-            return lockId.hashCode();
-        }
-
-        @Override // GemStoneAddition
-        public boolean equals(Object other) {
-
-            if (other instanceof LockDecree) {
-                return ((LockDecree)other).lockId.equals(this.lockId);
-            } else {
-                return false;
-            }
-        }
-    }
-
-
-    /**
-     * This class represents the lock to be released.
-     */
-    public static class AcquireLockDecree extends LockDecree  {
-        private static final long serialVersionUID = 7608853900623293300L;
-        private final long creationTime;
-
-        protected AcquireLockDecree(Object lockId, Object requester, Object managerId) {
-            super(lockId, requester, managerId);
-            this.creationTime = System.currentTimeMillis();
-        }
-
-        @Override
-        public boolean equals(Object o) { // GemStoneAddition for findbugs
-          return super.equals(o);
-        }
-        
-        @Override
-        public int hashCode() { // GemStoneAddition for findbugs
-          return super.hashCode();
-        }
-        
-        /**
-         * Lock aquire decree is valid for a <code>ACQUIRE_EXPIRATION</code>
-         * time after creation and if the lock is still valid (in the
-         * future locks will be leased for a predefined period of time).
-         */
-        @Override // GemStoneAddition
-        public boolean isValid() {
-            boolean result =  super.isValid();
-
-            if (!commited && result)
-                result = ((creationTime + ACQUIRE_EXPIRATION) > System.currentTimeMillis());
-
-            return result;
-        }
-
-    }
-
-    /**
-     * This class represents the lock to be released.
-     */
-    public static class ReleaseLockDecree extends LockDecree  {
-        private static final long serialVersionUID = -159320406385342426L;
-        ReleaseLockDecree(Object lockId, Object requester, Object managerId) {
-            super(lockId, requester, managerId);
-        }
-    }
-    
-    /**
-     * This class represents the lock that has to be marked as multilocked 
-     */
-    public static class MultiLockDecree extends LockDecree  {
-        private static final long serialVersionUID = -8775726661815938941L;
-        MultiLockDecree(Object lockId, Object requester, Object managerId) {
-            super(lockId, requester, managerId);
-        }
-
-        MultiLockDecree(ReleaseLockDecree releaseLockDecree) {
-            super(releaseLockDecree.lockId, releaseLockDecree.requester, releaseLockDecree.managerId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedQueue.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedQueue.java
deleted file mode 100644
index 8cf33ac..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedQueue.java
+++ /dev/null
@@ -1,758 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: DistributedQueue.java,v 1.16 2005/07/17 11:36:40 chrislott Exp $
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.RspList;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.Serializable;
-import java.util.*;
-
-
-/**
- * Provides the abstraction of a java.util.LinkedList that is replicated at several
- * locations. Any change to the list (reset, add, remove, etc.) will transparently be
- * propagated to all replicas in the group. All read-only methods will always access the
- * local replica.<p>
- * Both keys and values added to the list <em>must be serializable</em>, the reason
- * being that they will be sent across the network to all replicas of the group.
- * An instance of this class will contact an existing member of the group to fetch its
- * initial state.
- * Beware to use a <em>total protocol</em> on initialization or elements would not be in same
- * order on all replicas.
- * @author Romuald du Song
- */
-public class DistributedQueue implements MessageListener, MembershipListener, Cloneable
-{
-    public interface Notification
-    {
-        void entryAdd(Object value);
-
-        void entryRemoved(Object key);
-
-        void viewChange(Vector new_mbrs, Vector old_mbrs);
-
-        void contentsCleared();
-
-        void contentsSet(Collection new_entries);
-    }
-
-    protected GemFireTracer logger = GemFireTracer.getLog(getClass());
-    private long internal_timeout = 10000; // 10 seconds to wait for a response
-
-    /*lock object for synchronization*/
-    protected Object mutex = new Object();
-    protected transient boolean stopped = false; // whether to we are stopped !
-    protected LinkedList internalQueue;
-    protected transient Channel channel;
-    protected transient RpcDispatcher disp = null;
-    protected transient String groupname = null;
-    protected transient Vector notifs = new Vector(); // to be notified when mbrship changes
-    protected transient Vector members = new Vector(); // keeps track of all DHTs
-    private transient Class[] add_signature = null;
-    private transient Class[] addAtHead_signature = null;
-    private transient Class[] addAll_signature = null;
-    private transient Class[] reset_signature = null;
-    private transient Class[] remove_signature = null;
-    
-    /**
-     * Creates a DistributedQueue
-     * @param groupname The name of the group to join
-     * @param factory The ChannelFactory which will be used to create a channel
-     * @param properties The property string to be used to define the channel
-     * @param state_timeout The time to wait until state is retrieved in milliseconds. A value of 0 means wait forever.
-     */
-    public DistributedQueue(String groupname, ChannelFactory factory, String properties, long state_timeout)
-                     throws ChannelException
-    {
-        if (logger.isDebugEnabled())
-        {
-            logger.debug("DistributedQueue(" + groupname + ',' + properties + ',' + state_timeout);
-        }
-
-        this.groupname = groupname;
-        initSignatures();
-        internalQueue = new LinkedList();
-        channel = (factory != null) ? factory.createChannel(properties) : new JChannel(properties);
-        disp = new RpcDispatcher(channel, this, this, this);
-        disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
-        channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
-        channel.connect(groupname);
-        start(state_timeout);
-    }
-
-    public DistributedQueue(JChannel channel)
-    {
-        this.groupname = channel.getChannelName();
-        this.channel = channel;
-        init();
-    }
-
-    /**
-      * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be
-      * used to register under that id. This is typically used when another building block is already using
-      * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
-      * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
-      * first block created on PullPushAdapter.
-      * The caller needs to call start(), before using the this block. It gives the opportunity for the caller
-      * to register as a lessoner for Notifications events.
-      * @param adapter The PullPushAdapter which to use as underlying transport
-      * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
-      *           requests/responses for different building blocks on top of PullPushAdapter.
-      */
-    public DistributedQueue(PullPushAdapter adapter, Serializable id)
-    {
-        this.channel = (Channel)adapter.getTransport();
-        this.groupname = this.channel.getChannelName();
-
-        initSignatures();
-        internalQueue = new LinkedList();
-
-        channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
-        disp = new RpcDispatcher(adapter, id, this, this, this);
-        disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
-    }
-
-    protected void init()
-    {
-        initSignatures();
-        internalQueue = new LinkedList();
-        channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
-        disp = new RpcDispatcher(channel, this, this, this);
-        disp.setDeadlockDetection(false); // To ensure strict FIFO MethodCall
-    }
-
-    public void start(long state_timeout) throws ChannelClosedException, ChannelNotConnectedException
-    {
-        boolean rc;
-        logger.debug("DistributedQueue.initState(" + groupname + "): starting state retrieval");
-
-        rc = channel.getState(null, state_timeout);
-
-        if (rc)
-        {
-            logger.info(ExternalStrings.DistributedQueue_DISTRIBUTEDQUEUEINITSTATE_0__STATE_WAS_RETRIEVED_SUCCESSFULLY, groupname);
-        }
-        else
-        {
-            logger.info(ExternalStrings.DistributedQueue_DISTRIBUTEDQUEUEINITSTATE_0__STATE_COULD_NOT_BE_RETRIEVED_FIRST_MEMBER, groupname);
-        }
-    }
-
-    public Address getLocalAddress()
-    {
-        return (channel != null) ? channel.getLocalAddress() : null;
-    }
-
-    public Channel getChannel()
-    {
-        return channel;
-    }
-
-    public void addNotifier(Notification n)
-    {
-        if (n != null && !notifs.contains(n))
-        {
-            notifs.addElement(n);
-        }
-    }
-
-    public void removeNotifier(Notification n)
-    {
-        notifs.removeElement(n);
-    }
-
-    public void stop()
-    {
-        /*lock the queue from other threads*/
-        synchronized (mutex)
-        {
-            internalQueue.clear();
-
-            if (disp != null)
-            {
-                disp.stop();
-                disp = null;
-            }
-
-            if (channel != null)
-            {
-                channel.close();
-                channel = null;
-            }
-
-            stopped = true;
-        }
-    }
-
-    /**
-     * Add the speficied element at the bottom of the queue
-     * @param value
-     */
-    public void add(Object value)
-    {
-        try
-        {
-            Object retval = null;
-
-            RspList rsp = disp.callRemoteMethods(null, "_add", new Object[]{value}, add_signature, GroupRequest.GET_ALL, 0);
-            Vector results = rsp.getResults();
-
-            if (results.size() > 0)
-            {
-                retval = results.elementAt(0);
-
-                if (logger.isDebugEnabled())
-                {
-                    checkResult(rsp, retval);
-                }
-            }
-        }
-         catch (Exception e)
-        {
-            logger.error(ExternalStrings.DistributedQueue_UNABLE_TO_ADD_VALUE__0, value, e);
-        }
-
-        return;
-    }
-
-    /**
-     * Add the specified element at the top of the queue
-     * @param value
-     */
-    public void addAtHead(Object value)
-    {
-        try
-        {
-            disp.callRemoteMethods(null, "_addAtHead", new Object[]{value}, addAtHead_signature, GroupRequest.GET_ALL, 0);
-        }
-         catch (Exception e)
-        {
-            logger.error(ExternalStrings.DistributedQueue_UNABLE_TO_ADDATHEAD_VALUE__0, value, e);
-        }
-
-        return;
-    }
-
-    /**
-     * Add the speficied collection to the top of the queue.
-     * Elements are added in the order that they are returned by the specified
-     * collection's iterator.
-     * @param values
-     */
-    public void addAll(Collection values)
-    {
-        try
-        {
-            disp.callRemoteMethods(null, "_addAll", new Object[]{values}, addAll_signature, GroupRequest.GET_ALL, 0);
-        }
-         catch (Exception e)
-        {
-            logger.error(ExternalStrings.DistributedQueue_UNABLE_TO_ADDALL_VALUE__0, values, e);
-        }
-
-        return;
-    }
-
-    public Vector getContents()
-    {
-        Vector result = new Vector();
-
-        for (Iterator e = internalQueue.iterator(); e.hasNext();)
-            result.add(e.next());
-
-        return result;
-    }
-
-    public int size()
-    {
-        return internalQueue.size();
-    }
-
-    /**
-      * returns the first object on the queue, without removing it.
-      * If the queue is empty this object blocks until the first queue object has
-      * been added
-      * @return the first object on the queue
-      */
-    public Object peek()
-    {
-        Object retval = null;
-
-        try
-        {
-            retval = internalQueue.getFirst();
-        }
-         catch (NoSuchElementException e)
-        {
-        }
-
-        return retval;
-    }
-
-    public void reset()
-    {
-        try
-        {
-            disp.callRemoteMethods(null, "_reset", null, reset_signature, GroupRequest.GET_ALL, 0);
-        }
-         catch (Exception e)
-        {
-            logger.error(ExternalStrings.DistributedQueue_DISTRIBUTEDQUEUERESET_0, groupname, e);
-        }
-    }
-
-    protected void checkResult(RspList rsp, Object retval)
-    {
-        if (logger.isDebugEnabled())
-        {
-            logger.debug("Value updated from " + groupname + " :" + retval);
-        }
-
-        Vector results = rsp.getResults();
-
-        for (int i = 0; i < results.size(); i++)
-        {
-            Object data = results.elementAt(i);
-
-            if (!data.equals(retval))
-            {
-                logger.error(ExternalStrings.DistributedQueue_REFERENCE_VALUE_DIFFERS_FROM_RETURNED_VALUE__0____1, new Object[] {retval, data});
-            }
-        }
-    }
-
-    /**
-     * Try to return the first objet in the queue.It does not wait for an object.
-     * @return the first object in the queue or null if none were found.
-     */
-    public Object remove()
-    {
-        Object retval = null;
-        RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout);
-        Vector results = rsp.getResults();
-
-        if (results.size() > 0)
-        {
-            retval = results.elementAt(0);
-
-            if (logger.isDebugEnabled())
-            {
-                checkResult(rsp, retval);
-            }
-        }
-
-        return retval;
-    }
-
-    /**
-     * @param timeout The time to wait until an entry is retrieved in milliseconds. A value of 0 means wait forever.
-     * @return the first object in the queue or null if none were found
-     */
-    public Object remove(long timeout)
-    {
-        Object retval = null;
-        long start = System.currentTimeMillis();
-
-        if (timeout <= 0)
-        {
-            while (!stopped && (retval == null))
-            {
-                RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout);
-                Vector results = rsp.getResults();
-
-                if (results.size() > 0)
-                {
-                    retval = results.elementAt(0);
-
-                    if (logger.isDebugEnabled())
-                    {
-                        checkResult(rsp, retval);
-                    }
-                }
-
-                if (retval == null)
-                {
-                    try
-                    {
-                        synchronized (mutex)
-                        {
-                            mutex.wait();
-                        }
-                    }
-                     catch (InterruptedException e)
-                    {
-                       Thread.currentThread().interrupt(); // GemStoneAddition
-                       return null; // GemStoneAddition - treat as failure
-                    }
-                }
-            }
-        }
-        else
-        {
-            while (((System.currentTimeMillis() - start) < timeout) && !stopped && (retval == null))
-            {
-                RspList rsp = disp.callRemoteMethods(null, "_remove", null, remove_signature, GroupRequest.GET_ALL, internal_timeout);
-                Vector results = rsp.getResults();
-
-                if (results.size() > 0)
-                {
-                    retval = results.elementAt(0);
-
-                    if (logger.isDebugEnabled())
-                    {
-                        checkResult(rsp, retval);
-                    }
-                }
-
-                if (retval == null)
-                {
-                    try
-                    {
-                        long delay = timeout - (System.currentTimeMillis() - start);
-
-                        synchronized (mutex)
-                        {
-                            if (delay > 0)
-                            {
-                                mutex.wait(delay);
-                            }
-                        }
-                    }
-                     catch (InterruptedException e)
-                    {
-                       Thread.currentThread().interrupt(); // GemStoneAddition
-                       break; // GemStoneAddition - treat as timeout
-                    }
-                }
-            }
-        }
-
-        return retval;
-    }
-
-    @Override // GemStoneAddition
-    public String toString()
-    {
-        return internalQueue.toString();
-    }
-
-    /*------------------------ Callbacks -----------------------*/
-    public void _add(Object value)
-    {
-        if (logger.isDebugEnabled())
-        {
-            logger.debug(groupname + '@' + getLocalAddress() + " _add(" + value + ')');
-        }
-
-        /*lock the queue from other threads*/
-        synchronized (mutex)
-        {
-            internalQueue.add(value);
-
-            /*wake up all the threads that are waiting for the lock to be released*/
-            mutex.notifyAll();
-        }
-
-        for (int i = 0; i < notifs.size(); i++)
-            ((Notification)notifs.elementAt(i)).entryAdd(value);
-    }
-
-    public void _addAtHead(Object value)
-    {
-        /*lock the queue from other threads*/
-        synchronized (mutex)
-        {
-            internalQueue.addFirst(value);
-
-            /*wake up all the threads that are waiting for the lock to be released*/
-            mutex.notifyAll();
-        }
-
-        for (int i = 0; i < notifs.size(); i++)
-            ((Notification)notifs.elementAt(i)).entryAdd(value);
-    }
-
-    public void _reset()
-    {
-        if (logger.isDebugEnabled())
-        {
-            logger.debug(groupname + '@' + getLocalAddress() + " _reset()");
-        }
-
-        _private_reset();
-
-        for (int i = 0; i < notifs.size(); i++)
-            ((Notification)notifs.elementAt(i)).contentsCleared();
-    }
-
-    protected void _private_reset()
-    {
-        /*lock the queue from other threads*/
-        synchronized (mutex)
-        {
-            internalQueue.clear();
-
-            /*wake up all the threads that are waiting for the lock to be released*/
-            mutex.notifyAll();
-        }
-    }
-
-    public Object _remove()
-    {
-        Object retval = null;
-
-        try
-        {
-            /*lock the queue from other threads*/
-            synchronized (mutex)
-            {
-                retval = internalQueue.removeFirst();
-
-                /*wake up all the threads that are waiting for the lock to be released*/
-                mutex.notifyAll();
-            }
-
-            if (logger.isDebugEnabled())
-            {
-                logger.debug(groupname + '@' + getLocalAddress() + "_remove(" + retval + ')');
-            }
-
-            for (int i = 0; i < notifs.size(); i++)
-                ((Notification)notifs.elementAt(i)).entryRemoved(retval);
-        }
-         catch (NoSuchElementException e)
-        {
-            logger.debug(groupname + '@' + getLocalAddress() + "_remove(): nothing to remove");
-        }
-
-        return retval;
-    }
-
-    public void _addAll(Collection c)
-    {
-        if (logger.isDebugEnabled())
-        {
-            logger.debug(groupname + '@' + getLocalAddress() + " _addAll(" + c + ')');
-        }
-
-        /*lock the queue from other threads*/
-        synchronized (mutex)
-        {
-            internalQueue.addAll(c);
-
-            /*wake up all the threads that are waiting for the lock to be released*/
-            mutex.notifyAll();
-        }
-
-        for (int i = 0; i < notifs.size(); i++)
-            ((Notification)notifs.elementAt(i)).contentsSet(c);
-    }
-
-    /*----------------------------------------------------------*/
-    /*-------------------- State Exchange ----------------------*/
-    public void receive(Message msg)
-    {
-    }
-
-    public byte[] getState()
-    {
-        Vector copy = (Vector)getContents().clone();
-
-        try
-        {
-            return Util.objectToByteBuffer(copy);
-        }
-         catch (Throwable ex)
-        {
-            logger.error(ExternalStrings.DistributedQueue_DISTRIBUTEDQUEUEGETSTATE_EXCEPTION_MARSHALLING_STATE, ex);
-
-            return null;
-        }
-    }
-
-    public void setState(byte[] new_state)
-    {
-        Vector new_copy;
-
-        try
-        {
-            new_copy = (Vector)Util.objectFromByteBuffer(new_state);
-
-            if (new_copy == null)
-            {
-                return;
-            }
-        }
-         catch (Throwable ex)
-        {
-            logger.error(ExternalStrings.DistributedQueue_DISTRIBUTEDQUEUESETSTATE_EXCEPTION_UNMARSHALLING_STATE, ex);
-
-            return;
-        }
-
-        _private_reset(); // remove all elements      
-        _addAll(new_copy);
-    }
-
-    /*------------------- Membership Changes ----------------------*/
-    public void viewAccepted(View new_view)
-    {
-        Vector new_mbrs = new_view.getMembers();
-
-        if (new_mbrs != null)
-        {
-            sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left)
-            members.removeAllElements();
-
-            for (int i = 0; i < new_mbrs.size(); i++)
-                members.addElement(new_mbrs.elementAt(i));
-        }
-    }
-
-    /** Called when a member is suspected */
-    public void suspect(SuspectMember suspected_mbr)
-    {
-        ;
-    }
-
-    /** Block sending and receiving of messages until ViewAccepted is called */
-    public void block()
-    {
-    }
-
-    public void channelClosing(Channel c, Exception e) {} // GemStoneAddition
-    
-    
-    void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs)
-    {
-        Vector joined;
-        Vector left;
-        Object mbr;
-        Notification n;
-
-        if ((notifs.size() == 0) || (old_mbrs == null) || (new_mbrs == null) || (old_mbrs.size() == 0) ||
-                (new_mbrs.size() == 0))
-        {
-            return;
-        }
-
-        // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
-        joined = new Vector();
-
-        for (int i = 0; i < new_mbrs.size(); i++)
-        {
-            mbr = new_mbrs.elementAt(i);
-
-            if (!old_mbrs.contains(mbr))
-            {
-                joined.addElement(mbr);
-            }
-        }
-
-        // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
-        left = new Vector();
-
-        for (int i = 0; i < old_mbrs.size(); i++)
-        {
-            mbr = old_mbrs.elementAt(i);
-
-            if (!new_mbrs.contains(mbr))
-            {
-                left.addElement(mbr);
-            }
-        }
-
-        for (int i = 0; i < notifs.size(); i++)
-        {
-            n = (Notification)notifs.elementAt(i);
-            n.viewChange(joined, left);
-        }
-    }
-
-    void initSignatures()
-    {
-        try
-        {
-            if (add_signature == null)
-            {
-                add_signature = new Class[] { Object.class };
-            }
-
-            if (addAtHead_signature == null)
-            {
-                addAtHead_signature = new Class[] { Object.class };
-            }
-
-            if (addAll_signature == null)
-            {
-                addAll_signature = new Class[] { Collection.class };
-            }
-
-            if (reset_signature == null)
-            {
-                reset_signature = new Class[0];
-            }
-
-            if (remove_signature == null)
-            {
-                remove_signature = new Class[0];
-            }
-        }
-         catch (Throwable ex)
-        {
-           logger.error(ExternalStrings.DistributedQueue_DISTRIBUTEDQUEUEINITMETHODS, ex);
-        }
-    }
-
-//    public static void main(String[] args)
-//    {
-//        try
-//        {
-//            // The setup here is kind of weird:
-//            // 1. Create a channel
-//            // 2. Create a DistributedQueue (on the channel)
-//            // 3. Connect the channel (so the HT gets a VIEW_CHANGE)
-//            // 4. Start the HT
-//            //
-//            // A simpler setup is
-//            // DistributedQueue ht = new DistributedQueue("demo", null, 
-//            //         "file://c:/JGroups-2.0/conf/total-token.xml", 5000);
-//            JChannel c = new JChannel("file:/c:/JGroups-2.0/conf/conf/total-token.xml");
-//            c.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
-//
-//            DistributedQueue ht = new DistributedQueue(c);
-//            c.connect("demo");
-//            ht.start(5000);
-//
-//            ht.add("name");
-//            ht.add("Michelle Ban");
-//
-//            Object old_key = ht.remove();
-//            System.out.println("old key was " + old_key);
-//            old_key = ht.remove();
-//            System.out.println("old value was " + old_key);
-//
-//            ht.add("name 'Michelle Ban'");
-//
-//            System.out.println("queue is " + ht);
-//        }
-//        catch (VirtualMachineError err) { // GemStoneAddition
-//          SystemFailure.initiateFailure(err);
-//          // If this ever returns, rethrow the error.  We're poisoned
-//          // now, so don't let this thread continue.
-//          throw err;
-//        }
-//         catch (Throwable t)
-//        {
-//           SystemFailure.checkFailure(); // GemStoneAddition
-//            t.printStackTrace();
-//        }
-//    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedTree.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedTree.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedTree.java
deleted file mode 100644
index f9d392b..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/DistributedTree.java
+++ /dev/null
@@ -1,756 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: DistributedTree.java,v 1.14 2005/11/10 20:54:01 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.Serializable;
-import java.util.StringTokenizer;
-import java.util.Vector;
-
-
-
-
-/**
- * A tree-like structure that is replicated across several members. Updates will be multicast to all group
- * members reliably and in the same order.
- * @author Bela Ban
- * @author <a href="mailto:aolias@yahoo.com">Alfonso Olias-Sanz</a>
- */
-public class DistributedTree implements MessageListener, MembershipListener {
-    Node root=null;
-    final Vector listeners=new Vector();
-    final Vector view_listeners=new Vector();
-    final Vector members=new Vector();
-    protected Channel channel=null;
-    protected RpcDispatcher disp=null;
-    String groupname="DistributedTreeGroup";
-    String channel_properties="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=0):" +
-            "PING(timeout=5000;num_initial_members=6):" +
-            "FD_SOCK:" +
-            "VERIFY_SUSPECT(timeout=1500):" +
-            "pbcast.STABLE(desired_avg_gossip=10000):" +
-            "pbcast.NAKACK(gc_lag=5;retransmit_timeout=3000;trace=true):" +
-            "UNICAST(timeout=5000):" +
-            "FRAG(down_thread=false;up_thread=false):" +
-            "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
-            "shun=false;print_local_addr=true):" +
-            "pbcast.STATE_TRANSFER(trace=true)";
-    final long state_timeout=5000;   // wait 5 secs max to obtain state
-
-	/** Determines when the updates have to be sent across the network, avoids sending unnecessary
-     * messages when there are no member in the group */
-	private boolean send_message = false;
-
-    protected static final GemFireTracer log=GemFireTracer.getLog(DistributedTree.class);
-
-
-
-    public interface DistributedTreeListener {
-        void nodeAdded(String fqn, Serializable element);
-
-        void nodeRemoved(String fqn);
-
-        void nodeModified(String fqn, Serializable old_element, Serializable new_element);
-    }
-
-
-    public interface ViewListener {
-        void viewChange(Vector new_mbrs, Vector old_mbrs);
-    }
-
-
-    public DistributedTree() {
-    }
-
-
-    public DistributedTree(String groupname, String channel_properties) {
-        this.groupname=groupname;
-        if(channel_properties != null)
-            this.channel_properties=channel_properties;
-    }
-
-    /*
-     * Uses a user-provided PullPushAdapter to create the dispatcher rather than a Channel. If id is non-null, it will be
-     * used to register under that id. This is typically used when another building block is already using
-     * PullPushAdapter, and we want to add this building block in addition. The id is the used to discriminate
-     * between messages for the various blocks on top of PullPushAdapter. If null, we will assume we are the
-     * first block created on PullPushAdapter.
-     * @param adapter The PullPushAdapter which to use as underlying transport
-     * @param id A serializable object (e.g. an Integer) used to discriminate (multiplex/demultiplex) between
-     *           requests/responses for different building blocks on top of PullPushAdapter.
-     * @param state_timeout Max number of milliseconds to wait until state is
-     * retrieved
-     */
-    public DistributedTree(PullPushAdapter adapter, Serializable id, long state_timeout) 
-        throws ChannelException {
-        channel = (Channel)adapter.getTransport();
-        disp=new RpcDispatcher(adapter, id, this, this, this);
-        channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
-        boolean rc = channel.getState(null, state_timeout);
-        if(rc) {
-            if(log.isInfoEnabled()) log.info(ExternalStrings.DistributedTree_STATE_WAS_RETRIEVED_SUCCESSFULLY);
-        }
-        else
-            if(log.isInfoEnabled()) log.info(ExternalStrings.DistributedTree_STATE_COULD_NOT_BE_RETRIEVED_MUST_BE_FIRST_MEMBER_IN_GROUP);
-    }
-
-    public Object getLocalAddress() {
-        return channel != null? channel.getLocalAddress() : null;
-    }
-
-    public void setDeadlockDetection(boolean flag) {
-        if(disp != null)
-            disp.setDeadlockDetection(flag);
-    }
-
-    public void start() throws Exception {
-        start(8000);
-    }
-
-
-    public void start(long timeout) throws Exception {
-        if(channel != null) // already started
-            return;
-        channel=new JChannel(channel_properties);
-        disp=new RpcDispatcher(channel, this, this, this);
-        channel.connect(groupname);
-        channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
-        boolean rc=channel.getState(null, timeout);
-        if(rc) {
-            if(log.isInfoEnabled()) log.info(ExternalStrings.DistributedTree_STATE_WAS_RETRIEVED_SUCCESSFULLY);
-        }
-        else
-            if(log.isInfoEnabled()) log.info(ExternalStrings.DistributedTree_STATE_COULD_NOT_BE_RETRIEVED_MUST_BE_FIRST_MEMBER_IN_GROUP);
-    }
-
-
-    public void stop() {
-        if(channel != null) {
-            channel.close();
-            disp.stop();
-        }
-        channel=null;
-        disp=null;
-    }
-
-
-    public void addDistributedTreeListener(DistributedTreeListener listener) {
-        if(!listeners.contains(listener))
-            listeners.addElement(listener);
-    }
-
-
-    public void removeDistributedTreeListener(DistributedTreeListener listener) {
-        listeners.removeElement(listener);
-    }
-
-
-    public void addViewListener(ViewListener listener) {
-        if(!view_listeners.contains(listener))
-            view_listeners.addElement(listener);
-    }
-
-
-    public void removeViewListener(ViewListener listener) {
-        view_listeners.removeElement(listener);
-    }
-
-
-    public void add(String fqn) {
-        //Changes done by <aos>
-        //if true, propagate action to the group
-        if(send_message == true) {
-            try {
-				MethodCall call = new MethodCall("_add", new Object[] {fqn}, new String[] {String.class.getName()});
-                disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, 0);
-            }
-            catch(Exception ex) {
-                if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_EXCEPTION_0, ex);
-            }
-        }
-        else {
-            _add(fqn);
-        }
-    }
-
-    public void add(String fqn, Serializable element) {
-        add(fqn, element, 0);
-    }
-
-    /** resets an existing node, useful after a merge when you want to tell other 
-     *  members of your state, but do not wish to remove and then add as two separate calls */
-    public void reset(String fqn, Serializable element) 
-    {
-        reset(fqn, element, 0);
-    }
-
-    public void remove(String fqn) {
-        remove(fqn, 0);
-    }
-
-    public void add(String fqn, Serializable element, int timeout) {
-        //Changes done by <aos>
-        //if true, propagate action to the group
-        if(send_message == true) {
-            try {
-				MethodCall call = new MethodCall("_add", new Object[] {fqn, element}, 
-                    new String[] {String.class.getName(), Serializable.class.getName()});
-                disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);
-            }
-            catch(Exception ex) {
-                if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_EXCEPTION_0, ex);
-            }
-        }
-        else {
-            _add(fqn, element);
-        }
-    }
-
-    /** resets an existing node, useful after a merge when you want to tell other 
-     *  members of your state, but do not wish to remove and then add as two separate calls */
-    public void reset(String fqn, Serializable element, int timeout) 
-    {
-        //Changes done by <aos>
-        //if true, propagate action to the group
-        if(send_message == true) {
-            try {
-				MethodCall call = new MethodCall("_reset", new Object[] {fqn, element}, 
-                    new String[] {String.class.getName(), Serializable.class.getName()});
-                disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);
-            }
-            catch(Exception ex) {
-                if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_EXCEPTION_0, ex);
-            }
-        }
-        else {
-            _add(fqn, element);
-        }
-    }
-
-    public void remove(String fqn, int timeout) {
-        //Changes done by <aos>
-        //if true, propagate action to the group
-        if(send_message == true) {
-            try {
-            	MethodCall call = new MethodCall("_remove", new Object[] {fqn}, new String[] {String.class.getName()});
-                disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);
-            }
-            catch(Exception ex) {
-                if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_EXCEPTION_0, ex);
-            }
-        }
-        else {
-            _remove(fqn);
-        }
-    }
-
-
-    public boolean exists(String fqn) {
-        if(fqn == null)
-            return false;
-        return findNode(fqn) == null? false : true;
-    }
-
-
-    public Serializable get(String fqn) {
-        Node n=null;
-
-        if(fqn == null) return null;
-        n=findNode(fqn);
-        if(n != null) {
-            return n.element;
-        }
-        return null;
-    }
-
-
-    public void set(String fqn, Serializable element) {
-		set(fqn, element, 0);
-    }
-
-    public void set(String fqn, Serializable element, int timeout) {
-		//Changes done by <aos>
-		//if true, propagate action to the group
-        if(send_message == true) {
-            try {
-				MethodCall call = new MethodCall("_set", new Object[] {fqn, element}, 
-                    new String[] {String.class.getName(), Serializable.class.getName()});
-                disp.callRemoteMethods(null, call, GroupRequest.GET_ALL, timeout);
-            }
-            catch(Exception ex) {
-                if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_EXCEPTION_0, ex);
-            }
-        }
-        else {
-            _set(fqn, element);
-        }
-    }
-
-
-    /** Returns all children of a Node as strings */
-    public Vector getChildrenNames(String fqn) {
-        Vector ret=new Vector();
-        Node n;
-
-        if(fqn == null) return ret;
-        n=findNode(fqn);
-        if(n == null || n.children == null) return ret;
-        for(int i=0; i < n.children.size(); i++)
-            ret.addElement(((Node)n.children.elementAt(i)).name);
-        return ret;
-    }
-
-
-    public String print() {
-        StringBuffer sb=new StringBuffer();
-        int indent=0;
-
-        if(root == null)
-            return "/";
-
-        sb.append(root.print(indent));
-        return sb.toString();
-    }
-
-
-    /** Returns all children of a Node as Nodes */
-    Vector getChildren(String fqn) {
-        Node n;
-
-        if(fqn == null) return null;
-        n=findNode(fqn);
-        if(n == null) return null;
-        return n.children;
-    }
-
-    /**
-     * Returns the name of the group that the DistributedTree is connected to
-     * @return String
-     */
-    public String  getGroupName()           {return groupname;}
-	 	
-    /**
-     * Returns the Channel the DistributedTree is connected to 
-     * @return Channel
-     */
-    public Channel getChannel()             {return channel;}
-
-   /**
-     * Returns the number of current members joined to the group
-     * @return int
-     */
-    public int getGroupMembersNumber()			{return members.size();}
-
-
-
-
-    /*--------------------- Callbacks --------------------------*/
-
-    public void _add(String fqn) {
-        _add(fqn, null);
-    }
-
-
-    public void _add(String fqn, Serializable element) {
-        Node curr, n;
-        StringTokenizer tok;
-        String child_name;
-        String tmp_fqn="";
-
-        if(root == null) {
-            root=new Node("/", null);
-            notifyNodeAdded("/", null);
-        }
-        if(fqn == null)
-            return;
-        curr=root;
-        tok=new StringTokenizer(fqn, "/");
-
-        while(tok.hasMoreTokens()) {
-            child_name=tok.nextToken();
-            tmp_fqn=tmp_fqn + '/' + child_name;
-            n=curr.findChild(child_name);
-            if(n == null) {
-                n=new Node(child_name, null);
-                curr.addChild(n);
-                if(!tok.hasMoreTokens()) {
-                    n.element=element;
-                    notifyNodeAdded(tmp_fqn, element);
-                    return;
-                }
-                else
-                    notifyNodeAdded(tmp_fqn, null);
-            }
-            curr=n;
-        }
-        curr.element=element;
-        notifyNodeModified(fqn, null, element);
-    }
-
-
-    public void _remove(String fqn) {
-        Node curr, n;
-        StringTokenizer tok;
-        String child_name=null;
-
-        if(fqn == null || root == null)
-            return;
-        curr=root;
-        tok=new StringTokenizer(fqn, "/");
-
-        while(tok.countTokens() > 1) {
-            child_name=tok.nextToken();
-            n=curr.findChild(child_name);
-            if(n == null) // node does not exist
-                return;
-            curr=n;
-        }
-        try {
-            child_name=tok.nextToken();
-            if(child_name != null) {
-                n=curr.removeChild(child_name);
-                if(n != null)
-                    notifyNodeRemoved(fqn);
-            }
-        }
-        catch(Exception ex) {
-        }
-    }
-
-
-    public void _set(String fqn, Serializable element) {
-        Node n;
-        Serializable old_el=null;
-
-        if(fqn == null || element == null) return;
-        n=findNode(fqn);
-        if(n == null) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_NODE__0__NOT_FOUND, fqn);
-            return;
-        }
-        old_el=n.element;
-        n.element=element;
-        notifyNodeModified(fqn, old_el, element);
-    }
-
-    /** similar to set, but does not error if node does not exist, but rather does an add instead */
-    public void _reset(String fqn, Serializable element) {
-        Node n;
-        Serializable old_el=null;
-
-        if(fqn == null || element == null) return;
-        n=findNode(fqn);
-        if(n == null) {
-            _add(fqn, element);
-            return;
-        }
-        old_el=n.element;
-        n.element=element;
-        notifyNodeModified(fqn, old_el, element);
-    }
-
-    /*----------------- End of  Callbacks ----------------------*/
-
-
-
-
-
-
-    /*-------------------- State Exchange ----------------------*/
-
-    public void receive(Message msg) {
-    }
-
-    /** Return a copy of the tree */
-    public byte[] getState() {
-        Object copy=root != null? root.copy() : null;
-        try {
-            return Util.objectToByteBuffer(copy);
-        }
-        catch(Throwable ex) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_EXCEPTION_MARSHALLING_STATE__0, ex);
-            return null;
-        }
-    }
-
-    public void setState(byte[] data) {
-        Object new_state;
-
-        try {
-            new_state=Util.objectFromByteBuffer(data);
-        }
-        catch(Throwable ex) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_EXCEPTION_UNMARSHALLING_STATE__0, ex);
-            return;
-        }
-        if(new_state == null) return;
-        if(!(new_state instanceof Node)) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_OBJECT_IS_NOT_OF_TYPE_NODE);
-            return;
-        }
-        root=((Node)new_state).copy();
-    }
-
-
-
-    /*------------------- Membership Changes ----------------------*/
-
-    public void viewAccepted(View new_view) {
-        Vector new_mbrs=new_view.getMembers();
-
-        if(new_mbrs != null) {
-            sendViewChangeNotifications(new_mbrs, members); // notifies observers (joined, left)
-            members.removeAllElements();
-            for(int i=0; i < new_mbrs.size(); i++)
-                members.addElement(new_mbrs.elementAt(i));
-        }
-		//if size is bigger than one, there are more peers in the group
-		//otherwise there is only one server.
-        if(members.size() > 1) {
-            send_message=true;
-        }
-        else {
-            send_message=false;
-        }
-    }
-
-
-    /** Called when a member is suspected */
-    public void suspect(SuspectMember suspected_mbr) {
-    }
-
-
-    /** Block sending and receiving of messages until ViewAccepted is called */
-    public void block() {
-    }
-
-
-    public void channelClosing(Channel c, Exception e) {} // GemStoneAddition
-    
-    
-    void sendViewChangeNotifications(Vector new_mbrs, Vector old_mbrs) {
-        Vector joined, left;
-        Object mbr;
-
-        if(view_listeners.size() == 0 || old_mbrs == null || new_mbrs == null)
-            return;
-
-
-        // 1. Compute set of members that joined: all that are in new_mbrs, but not in old_mbrs
-        joined=new Vector();
-        for(int i=0; i < new_mbrs.size(); i++) {
-            mbr=new_mbrs.elementAt(i);
-            if(!old_mbrs.contains(mbr))
-                joined.addElement(mbr);
-        }
-
-
-        // 2. Compute set of members that left: all that were in old_mbrs, but not in new_mbrs
-        left=new Vector();
-        for(int i=0; i < old_mbrs.size(); i++) {
-            mbr=old_mbrs.elementAt(i);
-            if(!new_mbrs.contains(mbr))
-                left.addElement(mbr);
-        }
-        notifyViewChange(joined, left);
-    }
-
-
-    Node findNode(String fqn) {
-        Node curr=root;
-        StringTokenizer tok;
-        String child_name;
-
-        if(fqn == null || root == null) return null;
-        if("/".equals(fqn) || "".equals(fqn))
-            return root;
-
-        tok=new StringTokenizer(fqn, "/");
-        while(tok.hasMoreTokens()) {
-            child_name=tok.nextToken();
-            curr=curr.findChild(child_name);
-            if(curr == null) return null;
-        }
-        return curr;
-    }
-
-
-    void notifyNodeAdded(String fqn, Serializable element) {
-        for(int i=0; i < listeners.size(); i++)
-            ((DistributedTreeListener)listeners.elementAt(i)).nodeAdded(fqn, element);
-    }
-
-    void notifyNodeRemoved(String fqn) {
-        for(int i=0; i < listeners.size(); i++)
-            ((DistributedTreeListener)listeners.elementAt(i)).nodeRemoved(fqn);
-    }
-
-    void notifyNodeModified(String fqn, Serializable old_element, Serializable new_element) {
-        for(int i=0; i < listeners.size(); i++)
-            ((DistributedTreeListener)listeners.elementAt(i)).nodeModified(fqn, old_element, new_element);
-    }
-
-    /** Generates NodeAdded notifications for all nodes of the tree. This is called whenever the tree is
-     initially retrieved (state transfer) */
-    void notifyAllNodesCreated(Node curr, String tmp_fqn) {
-        Node n;
-
-        if(curr == null) return;
-        if(curr.name == null) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_CURRNAME_IS_NULL);
-            return;
-        }
-
-        if(curr.children != null) {
-            for(int i=0; i < curr.children.size(); i++) {
-                n=(Node)curr.children.elementAt(i);
-                System.out.println("*** nodeCreated(): tmp_fqn is " + tmp_fqn);
-                notifyNodeAdded(tmp_fqn, n.element);
-                notifyAllNodesCreated(n, tmp_fqn + '/' + n.name);
-            }
-        }
-    }
-
-
-    void notifyViewChange(Vector new_mbrs, Vector old_mbrs) {
-        for(int i=0; i < view_listeners.size(); i++)
-            ((ViewListener)view_listeners.elementAt(i)).viewChange(new_mbrs, old_mbrs);
-    }
-
-
-    private static class Node implements Serializable {
-        private static final long serialVersionUID = 2751686100384721286L;
-        String name=null;
-        Vector children=null;
-        Serializable element=null;
-
-
-        Node() {
-        }
-
-        Node(String name, Serializable element) {
-            this.name=name;
-            this.element=element;
-        }
-
-
-        void addChild(String relative_name, Serializable element) {
-            if(relative_name == null)
-                return;
-            if(children == null)
-                children=new Vector();
-            else {
-                if(!children.contains(relative_name))
-                    children.addElement(new Node(relative_name, element));
-            }
-        }
-
-
-        void addChild(Node n) {
-            if(n == null) return;
-            if(children == null)
-                children=new Vector();
-            if(!children.contains(n))
-                children.addElement(n);
-        }
-
-
-        Node removeChild(String rel_name) {
-            Node n=findChild(rel_name);
-
-            if(n != null)
-                children.removeElement(n);
-            return n;
-        }
-
-
-        Node findChild(String relative_name) {
-            Node child;
-
-            if(children == null || relative_name == null)
-                return null;
-            for(int i=0; i < children.size(); i++) {
-                child=(Node)children.elementAt(i);
-                if(child.name == null) {
-                    if(log.isErrorEnabled()) log.error(ExternalStrings.DistributedTree_CHILDNAME_IS_NULL_FOR__0, relative_name);
-                    continue;
-                }
-
-                if(child.name.equals(relative_name))
-                    return child;
-            }
-
-            return null;
-        }
-
-
-        @Override // GemStoneAddition
-        public boolean equals(Object other) {
-          if (!(other instanceof Node)) return false; // GemStoneAddition
-            return /*other != null && */ ((Node)other).name != null && name != null && name.equals(((Node)other).name);
-        }
-
-        @Override // GemStoneAddition
-        public int hashCode() { // GemStoneAddition
-          return 0; // TODO more efficient implementation :-)
-        }
-
-        Node copy() {
-            Node ret=new Node(name, element);
-
-            if(children != null)
-                ret.children=(Vector)children.clone();
-            return ret;
-        }
-
-
-        String print(int indent) {
-            StringBuffer sb=new StringBuffer();
-            boolean is_root=name != null && "/".equals(name);
-
-            for(int i=0; i < indent; i++)
-                sb.append(' ');
-            if(!is_root) {
-                if(name == null)
-                    sb.append("/<unnamed>");
-                else {
-                    sb.append('/' + name);
-                    // if(element != null) sb.append(" --> " + element);
-                }
-            }
-            sb.append('\n');
-            if(children != null) {
-                if(is_root)
-                    indent=0;
-                else
-                    indent+=4;
-                for(int i=0; i < children.size(); i++)
-                    sb.append(((Node)children.elementAt(i)).print(indent));
-            }
-            return sb.toString();
-        }
-
-
-        @Override // GemStoneAddition
-        public String toString() {
-            if(element != null)
-                return "[name: " + name + ", element: " + element + ']';
-            else
-                return "[name: " + name + ']';
-        }
-
-    }
-
-
-}
-
-


Mime
View raw message