geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [05/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject
Date Fri, 21 Aug 2015 21:22:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.new
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.new b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.new
deleted file mode 100644
index 9eec23a..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.new
+++ /dev/null
@@ -1,890 +0,0 @@
-// $Id$
-
-package org.jgroups.protocols.pbcast;
-
-
-import org.jgroups.*;
-import org.jgroups.stack.Protocol;
-import org.jgroups.util.Streamable;
-import org.jgroups.util.TimeScheduler;
-import org.jgroups.util.Util;
-
-import java.io.*;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Vector;
-
-
-
-
-/**
- * Computes the broadcast messages that are stable, i.e. have been received by all members. Sends
- * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that
- * have been seen by all members.<p>
- * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group.
- * A stability vector, which maintains the highest seqno for each member and initially contains no data,
- * is updated when such a message is received. The entry for a member P is computed set to
- * min(entry[P], digest[P]). When messages from all members have been received, a stability
- * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection
- * in the NAKACK layer).<p>
- * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received
- * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous
- * STABLE messages in the face of no activity.<br/>
- * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0),
- * a STABLE task will be started (unless it is already running).
- * @author Bela Ban
- */
-public class STABLE extends Protocol {
-    Address             local_addr=null;
-    final Vector        mbrs=new Vector();
-    final Digest        digest=new Digest(10);        // keeps track of the highest seqnos from all members
-    final Vector        heard_from=new Vector();      // keeps track of who we already heard from (STABLE_GOSSIP msgs)
-
-    /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */
-    long                desired_avg_gossip=20000;
-
-    /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very
-     * small number (> 0 !) if <code>max_bytes</code> is used */
-    long                stability_delay=6000;
-    StabilitySendTask   stability_task=null;
-    final Object        stability_mutex=new Object(); // to synchronize on stability_task
-    StableTask          stable_task=null;             // bcasts periodic STABLE message (added to timer below)
-    final Object        stable_task_mutex=new Object(); // to sync on stable_task
-    TimeScheduler       timer=null;                   // to send periodic STABLE msgs (and STABILITY messages)
-    int                 max_gossip_runs=3;            // max. number of times the StableTask runs before terminating
-    int                 num_gossip_runs=max_gossip_runs; // this number is decremented (max_gossip_runs doesn't change)
-    static final String name="STABLE";
-
-    /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE
-     * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally
-     * <code>stability_delay</code> should be set to a low number as well */
-    long                max_bytes=0;
-
-    /** The total number of bytes received from unicast and multicast messages */
-    long                num_bytes_received=0;
-
-    /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor
-     * handle STABILITY messages */
-    boolean             suspended=false;
-
-    /** Max time we should hold off on message garbage collection. This is a second line of defense in case
-     * we get a SUSPEND_STABLE, but forget to send a corresponding RESUME_STABLE (which should never happen !)
-     * The consequence of a missing RESUME_STABLE would be that the group doesn't garbage collect stable
-     * messages anymore, eventually, with a lot of traffic, every member would accumulate messages and run
-     * out of memory !
-     */
-    // long                max_suspend_time=600000;
-
-    ResumeTask          resume_task=null;
-    final Object        resume_task_mutex=new Object();
-
-    /** Number of gossip messages */
-    int                 num_gossips=0;
-
-
-    public String getName() {
-        return name;
-    }
-
-    public long getDesiredAverageGossip() {
-        return desired_avg_gossip;
-    }
-
-    public void setDesiredAverageGossip(long gossip_interval) {
-        desired_avg_gossip=gossip_interval;
-    }
-
-    public long getMaxBytes() {
-        return max_bytes;
-    }
-
-    public void setMaxBytes(long max_bytes) {
-        this.max_bytes=max_bytes;
-    }
-
-    public int getNumberOfGossipMessages() {return num_gossips;}
-
-    public void resetStats() {
-        super.resetStats();
-        num_gossips=0;
-    }
-
-
-    public Vector requiredDownServices() {
-        Vector retval=new Vector();
-        retval.addElement(new Integer(Event.GET_DIGEST_STABLE));  // NAKACK layer
-        return retval;
-    }
-
-    public boolean setProperties(Properties props) {
-        String str;
-
-        super.setProperties(props);
-        str=props.getProperty("digest_timeout");
-        if(str != null) {
-            props.remove("digest_timeout");
-            log.error("digest_timeout has been deprecated; it will be ignored");
-        }
-
-        str=props.getProperty("desired_avg_gossip");
-        if(str != null) {
-            desired_avg_gossip=Long.parseLong(str);
-            props.remove("desired_avg_gossip");
-        }
-
-        str=props.getProperty("stability_delay");
-        if(str != null) {
-            stability_delay=Long.parseLong(str);
-            props.remove("stability_delay");
-        }
-
-        str=props.getProperty("max_gossip_runs");
-        if(str != null) {
-            max_gossip_runs=Integer.parseInt(str);
-            num_gossip_runs=max_gossip_runs;
-            props.remove("max_gossip_runs");
-        }
-
-        str=props.getProperty("max_bytes");
-        if(str != null) {
-            max_bytes=Long.parseLong(str);
-            props.remove("max_bytes");
-        }
-
-        str=props.getProperty("max_suspend_time");
-        if(str != null) {
-            log.error("max_suspend_time is not supported any longer; please remove it (ignoring it)");
-            props.remove("max_suspend_time");
-        }
-
-        if(props.size() > 0) {
-            log.error("these properties are not recognized: " + props);
-            
-            return false;
-        }
-        return true;
-    }
-
-
-    private void suspend(long timeout) {
-        if(!suspended) {
-            suspended=true;
-            if(log.isDebugEnabled())
-                log.debug("suspending message garbage collection");
-        }
-        startResumeTask(timeout); // will not start task if already running
-    }
-
-    private void resume() {
-        suspended=false;
-        if(log.isDebugEnabled())
-            log.debug("resuming message garbage collection");
-        stopResumeTask();
-    }
-
-    public void start() throws Exception {
-        if(stack != null && stack.timer != null)
-            timer=stack.timer;
-        else
-            throw new Exception("timer cannot be retrieved from protocol stack");
-        initializeDigest();
-    }
-
-    public void stop() {
-        stopStableTask();
-        clearDigest();
-    }
-
-
-    public void up(Event evt) {
-        Message msg;
-        StableHeader hdr;
-        Header obj;
-        int type=evt.getType();
-
-        switch(type) {
-
-        case Event.MSG:
-            msg=(Message)evt.getArg();
-
-            if(max_bytes > 0) {  // message counting is enabled
-                long size=Math.max(msg.getLength(), 24);
-                num_bytes_received+=size;
-                if(num_bytes_received >= max_bytes) {
-                    if(log.isTraceEnabled()) {
-                        StringBuffer sb=new StringBuffer("max_bytes has been reached (max_bytes=");
-                        sb.append(max_bytes).append(", number of bytes received=");
-                        sb.append(num_bytes_received).append("): sending STABLE message");
-                        log.trace(sb.toString());
-                    }
-                    // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
-                    passDown(new Event(Event.GET_DIGEST_STABLE));
-                    num_bytes_received=0;
-                }
-            }
-
-            obj=msg.getHeader(name);
-            if(obj == null || !(obj instanceof StableHeader))
-                break;
-            hdr=(StableHeader)msg.removeHeader(name);
-            switch(hdr.type) {
-            case StableHeader.STABLE_GOSSIP:
-                handleStableGossip(msg.getSrc(), hdr.stableDigest);
-                break;
-            case StableHeader.STABILITY:
-                handleStabilityMessage(hdr.stableDigest);
-                break;
-            default:
-                if(log.isErrorEnabled()) log.error("StableHeader type " + hdr.type + " not known");
-            }
-            return;  // don't pass STABLE or STABILITY messages up the stack
-
-        case Event.VIEW_CHANGE:
-            View view=(View)evt.getArg();
-            handleViewChange(view);
-            break;
-
-        case Event.SET_LOCAL_ADDRESS:
-            local_addr=(Address)evt.getArg();
-            break;
-
-        case Event.GET_DIGEST_STABLE_OK:
-            Digest d=(Digest)evt.getArg();
-            updateFromOwnDigest(d);
-            sendStableMessage();
-            break;
-        }
-
-        passUp(evt);
-        if(desired_avg_gossip > 0) {
-            if(type == Event.VIEW_CHANGE || type == Event.MSG) {
-                startStableTask(); // only starts task if not yet running
-            }
-        }
-    }
-
-
-
-
-    public void down(Event evt) {
-        int type=evt.getType();
-
-        switch(evt.getType()) {
-        case Event.VIEW_CHANGE:
-            View v=(View)evt.getArg();
-            handleViewChange(v);
-            break;
-
-        case Event.SUSPEND_STABLE:
-            long timeout=0;
-            Object t=evt.getArg();
-            if(t != null && t instanceof Long)
-                timeout=((Long)t).longValue();
-            stopStableTask();
-            suspend(timeout);
-            break;
-
-        case Event.RESUME_STABLE:
-            resume();
-            break;
-        }
-
-        if(desired_avg_gossip > 0) {
-            if(type == Event.VIEW_CHANGE || type == Event.MSG)
-                startStableTask(); // only starts task if not yet running
-        }
-
-        passDown(evt);
-    }
-
-
-    public void runMessageGarbageCollection() {
-        sendStableMessage();
-    }
-
-
-
-    /* --------------------------------------- Private Methods ---------------------------------------- */
-
-
-    private void handleViewChange(View v) {
-        Vector tmp=v.getMembers();
-        mbrs.clear();
-        mbrs.addAll(tmp);
-        resetHeardFromList(tmp);
-        stopStableTask();
-    }
-
-
-    private void initializeDigest() {
-        Address mbr;
-        synchronized(digest) {
-            for(int i=0; i < mbrs.size(); i++) {
-                mbr=(Address)mbrs.get(i);
-                if(!digest.contains(mbr))
-                    digest.add(mbr, -1, -1);
-            }
-        }
-    }
-
-    private void clearDigest() {
-        synchronized(digest) {
-            digest.clear();
-        }
-    }
-
-    /**
-     * Updates own digest (this.digest) with latest digest from NAKACK, received as with GET_DIGEST_STABLE_OK
-     * @param latest_digest
-     */
-    private void updateFromOwnDigest(Digest latest_digest) {
-        if(latest_digest == null || latest_digest.size() == 0)
-            return;
-
-        Map.Entry entry;
-        org.jgroups.protocols.pbcast.Digest.Entry val;
-        Address mbr;
-
-        synchronized(digest) {
-            for(Iterator it=latest_digest.senders.entrySet().iterator(); it.hasNext();) {
-                entry=(Map.Entry)it.next();
-                mbr=(Address)entry.getKey();
-                val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
-                if(!digest.contains(mbr)) {
-                    digest.add(mbr, val.low_seqno, val.high_seqno,  val.high_seqno_seen);
-                }
-                else {
-                    digest.set(mbr, val.low_seqno, val.high_seqno,  val.high_seqno_seen);
-                }
-            }
-        }
-    }
-
-    /** Update my own digest from a digest received by somebody else. Returns the number of elements updated
-     *  Needs to be called with a lock on digest */
-    private int updateFromOtherDigest(Digest d) {
-        if(d == null || d.size() == 0)
-            return 0;
-
-        Address mbr;
-        long highest_seqno, my_highest_seqno;
-        long highest_seen_seqno, my_highest_seen_seqno;
-        Map.Entry entry;
-        org.jgroups.protocols.pbcast.Digest.Entry val;
-        int num_elements_updated=0;
-        for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
-            entry=(Map.Entry)it.next();
-            mbr=(Address)entry.getKey();
-            if(!digest.contains(mbr)) {
-                if(log.isTraceEnabled()) log.trace("sender " + mbr + " not found in my digest");
-                continue;
-            }
-            val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
-            highest_seqno=val.high_seqno;
-            highest_seen_seqno=val.high_seqno_seen;
-
-            // compute the minimum of the highest seqnos deliverable (for garbage collection)
-            my_highest_seqno=digest.highSeqnoAt(mbr);
-            digest.setHighSeqnoAt(mbr, Math.min(my_highest_seqno, highest_seqno));
-
-            // compute the maximum of the highest seqnos seen (for retransmission of last missing message)
-            my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr);
-            digest.setHighSeqnoSeenAt(mbr, Math.max(my_highest_seen_seqno, highest_seen_seqno));
-
-            num_elements_updated++;
-        }
-        return num_elements_updated;
-    }
-
-
-    private void resetHeardFromList(Vector new_members) {
-        if(new_members == null || new_members.size() == 0)
-            return;
-        synchronized(heard_from) {
-            heard_from.clear();
-            heard_from.addAll(new_members);
-            heard_from.remove(local_addr); // I don't need to hear from myself
-        }
-    }
-
-    private void removeFromHeardFromList(Address mbr) {
-        heard_from.remove(mbr); // Vector is already synchronized for a single operation
-    }
-
-
-    void startStableTask() {
-        num_gossip_runs=max_gossip_runs;
-
-        // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
-        // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
-        // 1 cycle: on the next message or view, we will start the task
-        if(stable_task != null)
-            return;
-        synchronized(stable_task_mutex) {
-            if(stable_task != null && stable_task.running()) {
-                return;  // already running
-            }
-            stable_task=new StableTask();
-            timer.add(stable_task, true); // fixed-rate scheduling
-        }
-        if(log.isTraceEnabled())
-            log.trace("stable task started; num_gossip_runs=" + num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs);
-    }
-
-
-    void stopStableTask() {
-        // contrary to startStableTask(), we don't need double-checked locking here because this method is not
-        // called frequently
-        synchronized(stable_task_mutex) {
-            if(stable_task != null) {
-                stable_task.stop();
-                stable_task=null;
-            }
-        }
-    }
-
-
-    void startResumeTask(long max_suspend_time) {
-        max_suspend_time=(long)(max_suspend_time * 1.1); // little slack
-
-        synchronized(resume_task_mutex) {
-            if(resume_task != null && resume_task.running()) {
-                return;  // already running
-            }
-            else {
-                resume_task=new ResumeTask(max_suspend_time);
-                timer.add(resume_task, true); // fixed-rate scheduling
-            }
-        }
-        if(log.isDebugEnabled())
-            log.debug("resume task started, max_suspend_time=" + max_suspend_time);
-    }
-
-
-    void stopResumeTask() {
-        synchronized(resume_task_mutex) {
-            if(resume_task != null) {
-                resume_task.stop();
-                resume_task=null;
-            }
-        }
-    }
-
-
-    void startStabilityTask(Digest d, long delay) {
-        synchronized(stability_mutex) {
-            if(stability_task != null && stability_task.running()) {
-                return;  // already running
-            }
-            else {
-                stability_task=new StabilitySendTask(d, delay);
-                timer.add(stability_task, true); // fixed-rate scheduling
-            }
-        }
-    }
-
-
-    void stopStabilityTask() {
-        synchronized(stability_mutex) {
-            if(stability_task != null) {
-                stability_task.stop();
-                stability_task=null;
-            }
-        }
-    }
-
-
-    /**
-     Digest d contains (a) the highest seqnos <em>deliverable</em> for each sender and (b) the highest seqnos
-     <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest
-     seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability
-     message, which results in garbage collection of messages lower than the ones in the stability vector. The
-     maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN
-     for details).
-     */
-    private void handleStableGossip(Address sender, Digest d) {
-        if(d == null || sender == null) {
-            if(log.isErrorEnabled()) log.error("digest or sender is null");
-            return;
-        }
-
-        if(suspended) {
-            if(log.isTraceEnabled()) {
-                log.trace("STABLE message will not be handled as I'm suspended");
-            }
-            return;
-        }
-
-        if(local_addr.equals(sender))
-            return; // don't need to update myself from myself
-
-        if(log.isTraceEnabled())
-            log.trace(new StringBuffer("received digest from ").append(sender).append(": ").append(d));
-        if(!heard_from.contains(sender)) {  // already received gossip from sender; discard it
-            if(log.isTraceEnabled()) log.trace("already received gossip from " + sender);
-            return;
-        }
-
-        // we won't handle the gossip d, if d's members don't match the membership in my own digest,
-        // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
-//        if(!this.digest.sameSenders(d)) {
-//            if(log.isTraceEnabled()) {
-//                log.trace("received digest from " + sender + " (digest=" + d + ") which does not match my own digest ("+
-//                        this.digest + "): ignoring digest and re-initializing own digest");
-//            }
-//            resetHeardFromList(mbrs);
-//            return;
-//        }
-
-        Digest copy;
-        synchronized(digest) {
-            int num_elements_expected=digest.size(), num_elements_updated=updateFromOtherDigest(d);
-            // we can only remove the sender from heard_from if *all* elements of my digest were updated
-            if(num_elements_updated != num_elements_expected)
-                return;
-            copy=digest.copy();
-        }
-
-        removeFromHeardFromList(sender);
-        if(heard_from.size() == 0) {
-            if(log.isTraceEnabled()) log.trace("sending stability msg " + copy);
-            sendStabilityMessage(copy);
-            resetHeardFromList(mbrs);
-        }
-    }
-
-
-    /**
-     * Bcasts a STABLE message of the current digest to all members. Message contains highest seqnos of all members
-     * seen by this member. Highest seqnos are retrieved from the NAKACK layer below.
-     */
-    private void sendStableMessage() {
-        if(suspended) {
-            if(log.isTraceEnabled())
-                log.trace("will not send STABLE message as I'm suspended");
-            return;
-        }
-
-        Digest d=null;
-        Message msg=new Message(); // mcast message
-        StableHeader hdr;
-        d=this.digest.copy();
-        if(d != null && d.size() > 0) {
-            if(log.isTraceEnabled())
-                log.trace(new StringBuffer("mcasting STABLE msg, digest=").append(d).append(" (num_gossip_runs=").
-                          append(num_gossip_runs).append(", max_gossip_runs=").append(max_gossip_runs).append(')'));
-            hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d);
-            msg.putHeader(name, hdr);
-            num_gossips++;
-            passDown(new Event(Event.MSG, msg));
-        }
-    }
-
-
-
-    /**
-     Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs).
-     The reason for waiting a random amount of time is that, in the worst case, all members receive a
-     STABLE_GOSSIP message from the last outstanding member at the same time and would therefore mcast the
-     STABILITY message at the same time too. To avoid this, each member waits random N msecs. If, before N
-     elapses, some other member sent the STABILITY message, we just cancel our own message. If, during
-     waiting for N msecs to send STABILITY message S1, another STABILITY message S2 is to be sent, we just
-     discard S2.
-     @param tmp A copy of te stability digest, so we don't need to copy it again
-     */
-    void sendStabilityMessage(Digest tmp) {
-        long delay;
-
-        if(timer == null) {
-            if(log.isErrorEnabled())
-                log.error("timer is null, cannot schedule stability message to be sent");
-            timer=stack != null ? stack.timer : null;
-            return;
-        }
-
-        // give other members a chance to mcast STABILITY message. if we receive STABILITY by the end of
-        // our random sleep, we will not send the STABILITY msg. this prevents that all mbrs mcast a
-        // STABILITY msg at the same time
-        delay=Util.random(stability_delay);
-        startStabilityTask(tmp, delay);
-    }
-
-
-    void handleStabilityMessage(Digest d) {
-        if(d == null) {
-            if(log.isErrorEnabled()) log.error("stability vector is null");
-            return;
-        }
-
-        if(suspended) {
-            if(log.isDebugEnabled()) {
-                log.debug("STABILITY message will not be handled as suspended=" + suspended);
-            }
-            return;
-        }
-
-        if(log.isDebugEnabled()) log.debug("stability vector is " + d.printHighSeqnos());
-        stopStabilityTask();
-
-        // we won't handle the gossip d, if d's members don't match the membership in my own digest,
-        // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
-        if(!this.digest.sameSenders(d)) {
-            if(log.isDebugEnabled()) {
-                log.debug("received digest (digest=" + d + ") which does not match my own digest ("+
-                        this.digest + "): ignoring digest and re-initializing own digest");
-            }
-            resetHeardFromList(mbrs);
-            return;
-        }
-
-        // pass STABLE event down the stack, so NAKACK can garbage collect old messages
-        passDown(new Event(Event.STABLE, d));
-    }
-
-
-
-    /* ------------------------------------End of Private Methods ------------------------------------- */
-
-
-
-
-
-
-
-    public static class StableHeader extends Header implements Streamable {
-        public static final int STABLE_GOSSIP=1;
-        public static final int STABILITY=2;
-
-        int type=0;
-        // Digest digest=new Digest();  // used for both STABLE_GOSSIP and STABILITY message
-        Digest stableDigest=null; // changed by Bela April 4 2004
-
-        public StableHeader() {
-        } // used for externalizable
-
-
-        public StableHeader(int type, Digest digest) {
-            this.type=type;
-            this.stableDigest=digest;
-        }
-
-
-        static String type2String(int t) {
-            switch(t) {
-                case STABLE_GOSSIP:
-                    return "STABLE_GOSSIP";
-                case STABILITY:
-                    return "STABILITY";
-                default:
-                    return "<unknown>";
-            }
-        }
-
-        public String toString() {
-            StringBuffer sb=new StringBuffer();
-            sb.append('[');
-            sb.append(type2String(type));
-            sb.append("]: digest is ");
-            sb.append(stableDigest);
-            return sb.toString();
-        }
-
-
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(type);
-            if(stableDigest == null) {
-                out.writeBoolean(false);
-                return;
-            }
-            out.writeBoolean(true);
-            stableDigest.writeExternal(out);
-        }
-
-
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            type=in.readInt();
-            boolean digest_not_null=in.readBoolean();
-            if(digest_not_null) {
-                stableDigest=new Digest();
-                stableDigest.readExternal(in);
-            }
-        }
-
-        public long size() {
-            long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest
-            if(stableDigest != null)
-                retval+=stableDigest.serializedSize();
-            return retval;
-        }
-
-        public void writeTo(DataOutputStream out) throws IOException {
-            out.writeInt(type);
-            Util.writeStreamable(stableDigest, out);
-        }
-
-        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
-            type=in.readInt();
-            stableDigest=(Digest)Util.readStreamable(Digest.class, in);
-        }
-
-
-    }
-
-
-
-
-    /**
-     Mcast periodic STABLE message. Interval between sends varies. Terminates after num_gossip_runs is 0.
-     However, UP or DOWN messages will reset num_gossip_runs to max_gossip_runs. This has the effect that the
-     stable_send task terminates only after a period of time within which no messages were either sent
-     or received
-     */
-    private class StableTask implements TimeScheduler.Task {
-        boolean stopped=false;
-
-        public void stop() {
-            stopped=true;
-        }
-
-        public boolean running() { // syntactic sugar
-            return !stopped;
-        }
-
-        public boolean cancelled() {
-            return stopped;
-        }
-
-        public long nextInterval() {
-            long interval=computeSleepTime();
-            if(interval <= 0)
-                return 10000;
-            else
-                return interval;
-        }
-
-
-        public void run() {
-            if(suspended) {
-                if(log.isTraceEnabled())
-                    log.trace("stable task will not run as suspended=" + suspended);
-                stopStableTask();
-                return;
-            }
-
-            // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
-            passDown(new Event(Event.GET_DIGEST_STABLE));
-
-            num_gossip_runs--;
-            if(num_gossip_runs <= 0) {
-                if(log.isTraceEnabled())
-                    log.trace("stable task terminating (num_gossip_runs=" +
-                              num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs + ')');
-                stopStableTask();
-            }
-        }
-
-        long computeSleepTime() {
-            return getRandom((mbrs.size() * desired_avg_gossip * 2));
-        }
-
-        long getRandom(long range) {
-            return (long)((Math.random() * range) % range);
-        }
-    }
-
-
-
-
-
-    /**
-     * Multicasts a STABILITY message.
-     */
-    private class StabilitySendTask implements TimeScheduler.Task {
-        Digest   d=null;
-        boolean  stopped=false;
-        long     delay=2000;
-
-
-        public StabilitySendTask(Digest d, long delay) {
-            this.d=d;
-            this.delay=delay;
-        }
-
-        public boolean running() {
-            return !stopped;
-        }
-
-        public void stop() {
-            stopped=true;
-        }
-
-        public boolean cancelled() {
-            return stopped;
-        }
-
-
-        /** wait a random number of msecs (to give other a chance to send the STABILITY msg first) */
-        public long nextInterval() {
-            return delay;
-        }
-
-
-        public void run() {
-            Message msg;
-            StableHeader hdr;
-
-            if(suspended) {
-                if(log.isDebugEnabled()) {
-                    log.debug("STABILITY message will not be sent as suspended=" + suspended);
-                }
-                stopped=true;
-                return;
-            }
-
-            if(d != null && !stopped) {
-                msg=new Message();
-                hdr=new StableHeader(StableHeader.STABILITY, d);
-                msg.putHeader(STABLE.name, hdr);
-                passDown(new Event(Event.MSG, msg));
-                d=null;
-            }
-            stopped=true; // run only once
-        }
-    }
-
-
-    private class ResumeTask implements TimeScheduler.Task {
-        boolean running=true;
-        long max_suspend_time=0;
-
-        ResumeTask(long max_suspend_time) {
-            this.max_suspend_time=max_suspend_time;
-        }
-
-        void stop() {
-            running=false;
-        }
-
-        public boolean running() {
-            return running;
-        }
-
-        public boolean cancelled() {
-            return running == false;
-        }
-
-        public long nextInterval() {
-            return max_suspend_time;
-        }
-
-        public void run() {
-            if(suspended)
-                log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " +
-                         "check why this event was not received (or increase max_suspend_time for large state transfers)");
-            resume();
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.old
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.old b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.old
deleted file mode 100644
index f8a559e..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.old
+++ /dev/null
@@ -1,855 +0,0 @@
-// $Id: STABLE.java,v 1.27 2005/07/13 19:57:59 belaban Exp $
-
-package org.jgroups.protocols.pbcast;
-
-
-import org.jgroups.*;
-import org.jgroups.stack.Protocol;
-import org.jgroups.util.Promise;
-import org.jgroups.util.TimeScheduler;
-import org.jgroups.util.Util;
-import org.jgroups.util.Streamable;
-
-import java.io.*;
-import java.util.Properties;
-import java.util.Vector;
-import java.util.Iterator;
-import java.util.Map;
-
-
-
-
-/**
- * Computes the broadcast messages that are stable, i.e. have been received by all members. Sends
- * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that
- * have been seen by all members.<p>
- * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group.
- * A stability vector, which maintains the highest seqno for each member and initially contains no data,
- * is updated when such a message is received. The entry for a member P is computed set to
- * min(entry[P], digest[P]). When messages from all members have been received, a stability
- * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection
- * in the NAKACK layer).<p>
- * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received
- * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous
- * STABLE messages in the face of no activity.<br/>
- * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0),
- * a STABLE task will be started (unless it is already running).
- * @author Bela Ban
- */
-public class STABLE extends Protocol {
-    Address             local_addr=null;
-    final Vector        mbrs=new Vector();
-    final Digest        digest=new Digest(10);        // keeps track of the highest seqnos from all members
-    final Promise       digest_promise=new Promise(); // for fetching digest (from NAKACK layer)
-    final Vector        heard_from=new Vector();      // keeps track of who we already heard from (STABLE_GOSSIP msgs)
-    long                digest_timeout=60000;         // time to wait until digest is received (from NAKACK)
-
-    /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */
-    long                desired_avg_gossip=20000;
-
-    /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very
-     * small number (> 0 !) if <code>max_bytes</code> is used */
-    long                stability_delay=6000;
-    StabilitySendTask   stability_task=null;
-    final Object        stability_mutex=new Object(); // to synchronize on stability_task
-    StableTask          stable_task=null;             // bcasts periodic STABLE message (added to timer below)
-    final Object        stable_task_mutex=new Object(); // to sync on stable_task
-    TimeScheduler       timer=null;                   // to send periodic STABLE msgs (and STABILITY messages)
-    int                 max_gossip_runs=3;            // max. number of times the StableTask runs before terminating
-    int                 num_gossip_runs=max_gossip_runs; // this number is decremented (max_gossip_runs doesn't change)
-    static final String name="STABLE";
-
-    /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE
-     * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally
-     * <code>stability_delay</code> should be set to a low number as well */
-    long                max_bytes=0;
-
-    /** The total number of bytes received from unicast and multicast messages */
-    long                num_bytes_received=0;
-
-    /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor
-     * handle STABILITY messages */
-    boolean             suspended=false;
-
-    /** Max time we should hold off on message garbage collection. This is a second line of defense in case
-     * we get a SUSPEND_STABLE, but forget to send a corresponding RESUME_STABLE (which should never happen !)
-     * The consequence of a missing RESUME_STABLE would be that the group doesn't garbage collect stable
-     * messages anymore, eventually, with a lot of traffic, every member would accumulate messages and run
-     * out of memory !
-     */
-    // long                max_suspend_time=600000;
-
-    ResumeTask          resume_task=null;
-    final Object        resume_task_mutex=new Object();
-
-    /** Number of gossip messages */
-    int                 num_gossips=0;
-
-
-    public String getName() {
-        return name;
-    }
-
-    public long getDesiredAverageGossip() {
-        return desired_avg_gossip;
-    }
-
-    public void setDesiredAverageGossip(long gossip_interval) {
-        desired_avg_gossip=gossip_interval;
-    }
-
-    public long getMaxBytes() {
-        return max_bytes;
-    }
-
-    public void setMaxBytes(long max_bytes) {
-        this.max_bytes=max_bytes;
-    }
-
-    public int getNumberOfGossipMessages() {return num_gossips;}
-
-    public void resetStats() {
-        super.resetStats();
-        num_gossips=0;
-    }
-
-
-    public Vector requiredDownServices() {
-        Vector retval=new Vector();
-        retval.addElement(new Integer(Event.GET_DIGEST_STABLE));  // NAKACK layer
-        return retval;
-    }
-
-    public boolean setProperties(Properties props) {
-        String str;
-
-        super.setProperties(props);
-        str=props.getProperty("digest_timeout");
-        if(str != null) {
-            digest_timeout=Long.parseLong(str);
-            props.remove("digest_timeout");
-        }
-
-        str=props.getProperty("desired_avg_gossip");
-        if(str != null) {
-            desired_avg_gossip=Long.parseLong(str);
-            props.remove("desired_avg_gossip");
-        }
-
-        str=props.getProperty("stability_delay");
-        if(str != null) {
-            stability_delay=Long.parseLong(str);
-            props.remove("stability_delay");
-        }
-
-        str=props.getProperty("max_gossip_runs");
-        if(str != null) {
-            max_gossip_runs=Integer.parseInt(str);
-            num_gossip_runs=max_gossip_runs;
-            props.remove("max_gossip_runs");
-        }
-
-        str=props.getProperty("max_bytes");
-        if(str != null) {
-            max_bytes=Long.parseLong(str);
-            props.remove("max_bytes");
-        }
-
-        str=props.getProperty("max_suspend_time");
-        if(str != null) {
-            log.error("max_suspend_time is not supported any longer; please remove it (ignoring it)");
-            props.remove("max_suspend_time");
-        }
-
-        if(props.size() > 0) {
-            log.error("STABLE.setProperties(): these properties are not recognized: " + props);
-            
-            return false;
-        }
-        return true;
-    }
-
-
-    void suspend(long timeout) {
-        if(!suspended) {
-            suspended=true;
-            if(log.isDebugEnabled())
-                log.debug("suspending message garbage collection");
-        }
-        startResumeTask(timeout); // will not start task if already running
-    }
-
-    void resume() {
-        suspended=false;
-        if(log.isDebugEnabled())
-            log.debug("resuming message garbage collection");
-        stopResumeTask();
-    }
-
-    public void start() throws Exception {
-        if(stack != null && stack.timer != null)
-            timer=stack.timer;
-        else
-            throw new Exception("STABLE.up(): timer cannot be retrieved from protocol stack");
-    }
-
-    public void stop() {
-        stopStableTask();
-    }
-
-
-    public void up(Event evt) {
-        Message msg;
-        StableHeader hdr;
-        Header obj;
-        int type=evt.getType();
-
-        switch(type) {
-
-        case Event.MSG:
-            msg=(Message)evt.getArg();
-
-            if(max_bytes > 0) {  // message counting is enabled
-                long size=Math.max(msg.getLength(), 24);
-                num_bytes_received+=size;
-                if(num_bytes_received >= max_bytes) {
-                    if(log.isTraceEnabled()) {
-                        StringBuffer sb=new StringBuffer("max_bytes has been exceeded (max_bytes=");
-                        sb.append(max_bytes).append(", number of bytes received=");
-                        sb.append(num_bytes_received).append("): sending STABLE message");
-                        log.trace(sb.toString());
-                    }
-
-                    new Thread() {
-                        public void run() {
-                            initialize();
-                            sendStableMessage();
-                        }
-                    }.start();
-                    num_bytes_received=0;
-                }
-            }
-
-            obj=msg.getHeader(name);
-            if(obj == null || !(obj instanceof StableHeader))
-                break;
-            hdr=(StableHeader)msg.removeHeader(name);
-            switch(hdr.type) {
-            case StableHeader.STABLE_GOSSIP:
-                handleStableGossip(msg.getSrc(), hdr.stableDigest);
-                break;
-            case StableHeader.STABILITY:
-                handleStabilityMessage(hdr.stableDigest);
-                break;
-            default:
-                if(log.isErrorEnabled()) log.error("StableHeader type " + hdr.type + " not known");
-            }
-            return;  // don't pass STABLE or STABILITY messages up the stack
-
-        case Event.SET_LOCAL_ADDRESS:
-            local_addr=(Address)evt.getArg();
-            break;
-        }
-
-        passUp(evt);
-        if(desired_avg_gossip > 0) {
-            if(type == Event.VIEW_CHANGE || type == Event.MSG)
-                startStableTask(); // only starts task if not yet running
-        }
-    }
-
-
-    /**
-     * We need to receive this event out-of-band, otherwise we would block. The use case is
-     * <ol>
-     * <li>To send a STABLE_GOSSIP message we need the digest (from NAKACK below)
-     * <li>We send a GET_DIGEST_STABLE event down <em>from the up() method</em>
-     * <li>NAKACK sends the GET_DIGEST_STABLE_OK backup. <em>However, we may have other messages in the
-     * up queue ahead of this event !</em> Therefore the event cannot be processed until all messages ahead of
-     * the event have been processed. These can't be processed, however, because the up() call waits for
-     * GET_DIGEST_STABLE_OK ! The up() call would always run into the timeout.<be/>
-     * Having out-of-band reception of just this one event eliminates the problem.
-     * </ol>
-     * @param evt
-     */
-    protected void receiveUpEvent(Event evt) {
-        if(evt.getType() == Event.GET_DIGEST_STABLE_OK) {
-            digest_promise.setResult(evt.getArg());
-            return;
-        }
-        super.receiveUpEvent(evt);
-    }
-
-
-    public void down(Event evt) {
-        int type=evt.getType();
-
-        switch(evt.getType()) {
-            case Event.VIEW_CHANGE:
-                View v=(View)evt.getArg();
-                Vector tmp=v.getMembers();
-                mbrs.removeAllElements();
-                mbrs.addAll(tmp);
-                heard_from.retainAll(tmp);     // removes all elements from heard_from that are not in new view
-                stopStableTask();
-                break;
-
-            case Event.SUSPEND_STABLE:
-                long timeout=0;
-                Object t=evt.getArg();
-                if(t != null && t instanceof Long)
-                    timeout=((Long)t).longValue();
-                stopStableTask();
-                suspend(timeout);
-                break;
-
-            case Event.RESUME_STABLE:
-                resume();
-                break;
-        }
-
-        if(desired_avg_gossip > 0) {
-            if(type == Event.VIEW_CHANGE || type == Event.MSG)
-                startStableTask(); // only starts task if not yet running
-        }
-
-        passDown(evt);
-    }
-
-
-    public void runMessageGarbageCollection() {
-        sendStableMessage();
-    }
-
-
-
-    /* --------------------------------------- Private Methods ---------------------------------------- */
-
-    void initialize() {
-        synchronized(digest) {
-            digest.clear();
-            for(int i=0; i < mbrs.size(); i++) {
-                digest.add((Address)mbrs.elementAt(i), -1, -1);
-                // digest.add((Address)mbrs.elementAt(i), 0, 0);
-            }
-            heard_from.removeAllElements();
-            heard_from.addAll(mbrs);
-        }
-    }
-
-
-    void startStableTask() {
-        num_gossip_runs=max_gossip_runs;
-
-        // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
-        // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
-        // 1 cycle: on the next message or view, we will start the task
-        if(stable_task != null)
-            return;
-        synchronized(stable_task_mutex) {
-            if(stable_task != null && stable_task.running()) {
-                return;  // already running
-            }
-            stable_task=new StableTask();
-            timer.add(stable_task, true); // fixed-rate scheduling
-        }
-        if(log.isTraceEnabled())
-            log.trace("stable task started; num_gossip_runs=" + num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs);
-    }
-
-
-    void stopStableTask() {
-        // contrary to startStableTask(), we don't need double-checked locking here because this method is not
-        // called frequently
-        synchronized(stable_task_mutex) {
-            if(stable_task != null) {
-                stable_task.stop();
-                stable_task=null;
-            }
-        }
-    }
-
-
-    void startResumeTask(long max_suspend_time) {
-        max_suspend_time=(long)(max_suspend_time * 1.1); // little slack
-
-        synchronized(resume_task_mutex) {
-            if(resume_task != null && resume_task.running()) {
-                return;  // already running
-            }
-            else {
-                resume_task=new ResumeTask(max_suspend_time);
-                timer.add(resume_task, true); // fixed-rate scheduling
-            }
-        }
-        if(log.isDebugEnabled())
-            log.debug("resume task started, max_suspend_time=" + max_suspend_time);
-    }
-
-
-    void stopResumeTask() {
-        synchronized(resume_task_mutex) {
-            if(resume_task != null) {
-                resume_task.stop();
-                resume_task=null;
-            }
-        }
-    }
-
-
-    void startStabilityTask(Digest d, long delay) {
-        synchronized(stability_mutex) {
-            if(stability_task != null && stability_task.running()) {
-                return;  // already running
-            }
-            else {
-                stability_task=new StabilitySendTask(d, delay);
-                timer.add(stability_task, true); // fixed-rate scheduling
-            }
-        }
-    }
-
-
-    void stopStabilityTask() {
-        synchronized(stability_mutex) {
-            if(stability_task != null) {
-                stability_task.stop();
-                stability_task=null;
-            }
-        }
-    }
-
-
-    /**
-     Digest d contains (a) the highest seqnos <em>deliverable</em> for each sender and (b) the highest seqnos
-     <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest
-     seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability
-     message, which results in garbage collection of messages lower than the ones in the stability vector. The
-     maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN
-     for details).
-     */
-    private void handleStableGossip(Address sender, Digest d) {
-        Address mbr;
-        long highest_seqno, my_highest_seqno;
-        long highest_seen_seqno, my_highest_seen_seqno;
-        boolean my_own_gossip=false;
-
-        if(d == null || sender == null) {
-            if(log.isErrorEnabled()) log.error("digest or sender is null");
-            return;
-        }
-
-        if(suspended) {
-            if(log.isTraceEnabled()) {
-                log.trace("STABLE message will not be handled as suspended=" + suspended);
-            }
-            return;
-        }
-
-        if(log.isTraceEnabled())
-            log.trace(new StringBuffer("received digest from ").append(sender).append(": ").append(d));
-        if(!heard_from.contains(sender)) {  // already received gossip from sender; discard it
-            if(log.isTraceEnabled()) log.trace("already received gossip from " + sender);
-            return;
-        }
-
-        // we won't handle the gossip d, if d's members don't match the membership in my own digest,
-        // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
-        if(!this.digest.sameSenders(d)) {
-            if(log.isTraceEnabled()) {
-                log.trace("received digest from " + sender + " (digest=" + d + ") which does not match my own digest ("+
-                        this.digest + "): ignoring digest and re-initializing own digest");
-            }
-            initialize();
-            return;
-        }
-
-        my_own_gossip=local_addr != null && local_addr.equals(sender);
-
-        Map.Entry entry;
-        org.jgroups.protocols.pbcast.Digest.Entry val;
-        for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
-            entry=(Map.Entry)it.next();
-            mbr=(Address)entry.getKey();
-            if(!digest.contains(mbr)) {
-                if(log.isTraceEnabled()) log.trace("sender " + mbr + " not found in stability vector");
-                continue;
-            }
-            val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
-            highest_seqno=val.high_seqno;
-            highest_seen_seqno=val.high_seqno_seen;
-
-            // compute the minimum of the highest seqnos deliverable (for garbage collection)
-            my_highest_seqno=digest.highSeqnoAt(mbr);
-            if(my_highest_seqno < 0) {
-                if(highest_seqno >= 0 && my_own_gossip)
-                    digest.setHighSeqnoAt(mbr, highest_seqno);
-            }
-            else {
-                digest.setHighSeqnoAt(mbr, Math.min(my_highest_seqno, highest_seqno));
-            }
-
-            // compute the maximum of the highest seqnos seen (for retransmission of last missing message)
-            my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr);
-            if(my_highest_seen_seqno < 0) {
-                if(highest_seen_seqno >= 0)
-                    digest.setHighSeqnoSeenAt(mbr, highest_seen_seqno);
-            }
-            else {
-                digest.setHighSeqnoSeenAt(mbr, Math.max(my_highest_seen_seqno, highest_seen_seqno));
-            }
-        }
-
-        heard_from.removeElement(sender);
-        if(heard_from.size() == 0) {
-            if(log.isTraceEnabled()) log.trace("sending stability msg " + digest);
-            sendStabilityMessage(digest.copy());
-            initialize();
-        }
-    }
-
-
-    /**
-     * Bcasts a STABLE message to all group members. Message contains highest seqnos of all members
-     * seen by this member. Highest seqnos are retrieved from the NAKACK layer above.
-     */
-    void sendStableMessage() {
-        Digest d=null;
-        Message msg=new Message(); // mcast message
-        StableHeader hdr;
-
-        if(suspended) {
-            if(log.isTraceEnabled())
-                log.trace("will not send STABLE message as suspended=" + suspended);
-            return;
-        }
-
-        d=getDigest();
-        if(d != null && d.size() > 0) {
-            if(log.isTraceEnabled())
-                log.trace("mcasting STABLE msg, digest=" + d +
-                          " (num_gossip_runs=" + num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs + ')');
-            hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d);
-            msg.putHeader(name, hdr);
-            num_gossips++;
-            passDown(new Event(Event.MSG, msg));
-        }
-    }
-
-
-
-    Digest getDigest() {
-        Digest ret=null;
-        passDown(new Event(Event.GET_DIGEST_STABLE));
-        ret=(Digest)digest_promise.getResult(digest_timeout);
-        if(ret == null) {
-            if(log.isErrorEnabled())
-                log.error("digest could not be fetched from below " + "(timeout was " + digest_timeout + " msecs)");
-        }
-        return ret;
-    }
-
-
-    /**
-     Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs).
-     The reason for waiting a random amount of time is that, in the worst case, all members receive a
-     STABLE_GOSSIP message from the last outstanding member at the same time and would therefore mcast the
-     STABILITY message at the same time too. To avoid this, each member waits random N msecs. If, before N
-     elapses, some other member sent the STABILITY message, we just cancel our own message. If, during
-     waiting for N msecs to send STABILITY message S1, another STABILITY message S2 is to be sent, we just
-     discard S2.
-     @param tmp A copy of te stability digest, so we don't need to copy it again
-     */
-    void sendStabilityMessage(Digest tmp) {
-        long delay;
-
-        if(timer == null) {
-            if(log.isErrorEnabled())
-                log.error("timer is null, cannot schedule stability message to be sent");
-            timer=stack != null ? stack.timer : null;
-            return;
-        }
-
-        // give other members a chance to mcast STABILITY message. if we receive STABILITY by the end of
-        // our random sleep, we will not send the STABILITY msg. this prevents that all mbrs mcast a
-        // STABILITY msg at the same time
-        delay=Util.random(stability_delay);
-        startStabilityTask(tmp, delay);
-    }
-
-
-    void handleStabilityMessage(Digest d) {
-        if(d == null) {
-            if(log.isErrorEnabled()) log.error("stability vector is null");
-            return;
-        }
-
-        if(suspended) {
-            if(log.isDebugEnabled()) {
-                log.debug("STABILITY message will not be handled as suspended=" + suspended);
-            }
-            return;
-        }
-
-        if(log.isDebugEnabled()) log.debug("stability vector is " + d.printHighSeqnos());
-        stopStabilityTask();
-
-        // we won't handle the gossip d, if d's members don't match the membership in my own digest,
-        // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
-        if(!this.digest.sameSenders(d)) {
-            if(log.isDebugEnabled()) {
-                log.debug("received digest (digest=" + d + ") which does not match my own digest ("+
-                        this.digest + "): ignoring digest and re-initializing own digest");
-            }
-            initialize();
-            return;
-        }
-
-        // pass STABLE event down the stack, so NAKACK can garbage collect old messages
-        passDown(new Event(Event.STABLE, d));
-    }
-
-
-
-    /* ------------------------------------End of Private Methods ------------------------------------- */
-
-
-
-
-
-
-
-    public static class StableHeader extends Header implements Streamable {
-        public static final int STABLE_GOSSIP=1;
-        public static final int STABILITY=2;
-
-        int type=0;
-        // Digest digest=new Digest();  // used for both STABLE_GOSSIP and STABILITY message
-        Digest stableDigest=null; // changed by Bela April 4 2004
-
-        public StableHeader() {
-        } // used for externalizable
-
-
-        public StableHeader(int type, Digest digest) {
-            this.type=type;
-            this.stableDigest=digest;
-        }
-
-
-        static String type2String(int t) {
-            switch(t) {
-                case STABLE_GOSSIP:
-                    return "STABLE_GOSSIP";
-                case STABILITY:
-                    return "STABILITY";
-                default:
-                    return "<unknown>";
-            }
-        }
-
-        public String toString() {
-            StringBuffer sb=new StringBuffer();
-            sb.append('[');
-            sb.append(type2String(type));
-            sb.append("]: digest is ");
-            sb.append(stableDigest);
-            return sb.toString();
-        }
-
-
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(type);
-            if(stableDigest == null) {
-                out.writeBoolean(false);
-                return;
-            }
-            out.writeBoolean(true);
-            stableDigest.writeExternal(out);
-        }
-
-
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            type=in.readInt();
-            boolean digest_not_null=in.readBoolean();
-            if(digest_not_null) {
-                stableDigest=new Digest();
-                stableDigest.readExternal(in);
-            }
-        }
-
-        public long size() {
-            long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest
-            if(stableDigest != null)
-                retval+=stableDigest.serializedSize();
-            return retval;
-        }
-
-        public void writeTo(DataOutputStream out) throws IOException {
-            out.writeInt(type);
-            Util.writeStreamable(stableDigest, out);
-        }
-
-        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
-            type=in.readInt();
-            stableDigest=(Digest)Util.readStreamable(Digest.class, in);
-        }
-
-
-    }
-
-
-
-
-    /**
-     Mcast periodic STABLE message. Interval between sends varies. Terminates after num_gossip_runs is 0.
-     However, UP or DOWN messages will reset num_gossip_runs to max_gossip_runs. This has the effect that the
-     stable_send task terminates only after a period of time within which no messages were either sent
-     or received
-     */
-    private class StableTask implements TimeScheduler.Task {
-        boolean stopped=false;
-
-        public void stop() {
-            stopped=true;
-        }
-
-        public boolean running() { // syntactic sugar
-            return !stopped;
-        }
-
-        public boolean cancelled() {
-            return stopped;
-        }
-
-        public long nextInterval() {
-            long interval=computeSleepTime();
-            if(interval <= 0)
-                return 10000;
-            else
-                return interval;
-        }
-
-
-        public void run() {
-            if(suspended) {
-                if(log.isTraceEnabled())
-                    log.trace("stable task will not run as suspended=" + suspended);
-                stopStableTask();
-                return;
-            }
-            initialize();
-            sendStableMessage();
-            num_gossip_runs--;
-            if(num_gossip_runs <= 0) {
-                if(log.isTraceEnabled())
-                    log.trace("stable task terminating (num_gossip_runs=" +
-                              num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs + ')');
-                stopStableTask();
-            }
-        }
-
-        long computeSleepTime() {
-            return getRandom((mbrs.size() * desired_avg_gossip * 2));
-        }
-
-        long getRandom(long range) {
-            return (long)((Math.random() * range) % range);
-        }
-    }
-
-
-
-
-
-    /**
-     * Multicasts a STABILITY message.
-     */
-    private class StabilitySendTask implements TimeScheduler.Task {
-        Digest   d=null;
-        boolean  stopped=false;
-        long     delay=2000;
-
-
-        public StabilitySendTask(Digest d, long delay) {
-            this.d=d;
-            this.delay=delay;
-        }
-
-        public boolean running() {
-            return !stopped;
-        }
-
-        public void stop() {
-            stopped=true;
-        }
-
-        public boolean cancelled() {
-            return stopped;
-        }
-
-
-        /** wait a random number of msecs (to give other a chance to send the STABILITY msg first) */
-        public long nextInterval() {
-            return delay;
-        }
-
-
-        public void run() {
-            Message msg;
-            StableHeader hdr;
-
-            if(suspended) {
-                if(log.isDebugEnabled()) {
-                    log.debug("STABILITY message will not be sent as suspended=" + suspended);
-                }
-                stopped=true;
-                return;
-            }
-
-            if(d != null && !stopped) {
-                msg=new Message();
-                hdr=new StableHeader(StableHeader.STABILITY, d);
-                msg.putHeader(STABLE.name, hdr);
-                passDown(new Event(Event.MSG, msg));
-                d=null;
-            }
-            stopped=true; // run only once
-        }
-    }
-
-
-    private class ResumeTask implements TimeScheduler.Task {
-        boolean running=true;
-        long max_suspend_time=0;
-
-        ResumeTask(long max_suspend_time) {
-            this.max_suspend_time=max_suspend_time;
-        }
-
-        void stop() {
-            running=false;
-        }
-
-        public boolean running() {
-            return running;
-        }
-
-        public boolean cancelled() {
-            return running == false;
-        }
-
-        public long nextInterval() {
-            return max_suspend_time;
-        }
-
-        public void run() {
-            if(suspended)
-                log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " +
-                         "check why this event was not received (or increase max_suspend_time for large state transfers)");
-            resume();
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STATE_TRANSFER.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STATE_TRANSFER.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STATE_TRANSFER.java
deleted file mode 100644
index 1fb7667..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STATE_TRANSFER.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: STATE_TRANSFER.java,v 1.25 2005/12/16 16:21:07 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.pbcast;
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.stack.StateTransferInfo;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.List;
-import com.gemstone.org.jgroups.util.Streamable;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.*;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Properties;
-import java.util.Vector;
-
-
-/**
- * New STATE_TRANSFER protocol based on PBCAST. Compared to the one in ./protocols, it doesn't
- * need a QUEUE layer above it. A state request is sent to a chosen member (coordinator if
- * null). That member makes a copy D of its current digest and asks the application for a copy of
- * its current state S. Then the member returns both S and D to the requester. The requester
- * first sets its digest to D and then returns the state to the application.
- * @author Bela Ban
- */
-public class STATE_TRANSFER extends Protocol  {
-    Address        local_addr=null;
-    final Vector   members=new Vector();
-    long           state_id=1;  // used to differentiate between state transfers (not currently used)
-    final List     state_requesters=new List(); // requesters of state (usually just 1, could be more)
-    Digest         digest=null;
-    final HashMap  map=new HashMap(); // to store configuration information
-    long           start, stop; // to measure state transfer time
-    int            num_state_reqs=0;
-    long           num_bytes_sent=0;
-    double         avg_state_size=0;
-    final static   String name="STATE_TRANSFER";
-
-
-    /** All protocol names have to be unique ! */
-    @Override // GemStoneAddition
-    public String getName() {
-        return name;
-    }
-
-    public int getNumberOfStateRequests() {return num_state_reqs;}
-    public long getNumberOfStateBytesSent() {return num_bytes_sent;}
-    public double getAverageStateSize() {return avg_state_size;}
-
-    @Override // GemStoneAddition
-    public Vector requiredDownServices() {
-        Vector retval=new Vector();
-        retval.addElement(Integer.valueOf(Event.GET_DIGEST_STATE));
-        retval.addElement(Integer.valueOf(Event.SET_DIGEST));
-        return retval;
-    }
-
-    @Override // GemStoneAddition
-    public void resetStats() {
-        super.resetStats();
-        num_state_reqs=0;
-        num_bytes_sent=0;
-        avg_state_size=0;
-    }
-
-
-    @Override // GemStoneAddition
-    public boolean setProperties(Properties props) {
-        super.setProperties(props);
-
-        if(props.size() > 0) {
-            log.error(ExternalStrings.STATE_TRANSFER_STATE_TRANSFERSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
-            return false;
-        }
-        return true;
-    }
-
-    @Override // GemStoneAddition
-    public void init() throws Exception {
-        map.put("state_transfer", Boolean.TRUE);
-        map.put("protocol_class", getClass().getName());
-    }
-
-
-    @Override // GemStoneAddition
-    public void start() throws Exception {
-        passUp(new Event(Event.CONFIG, map));
-    }
-
-
-    @Override // GemStoneAddition
-    public void up(Event evt) {
-        Message     msg;
-        StateHeader hdr;
-
-        switch(evt.getType()) {
-
-        case Event.BECOME_SERVER:
-            break;
-
-        case Event.SET_LOCAL_ADDRESS:
-            local_addr=(Address)evt.getArg();
-            break;
-
-        case Event.TMP_VIEW:
-        case Event.VIEW_CHANGE:
-            handleViewChange((View)evt.getArg());
-            break;
-
-        case Event.GET_DIGEST_STATE_OK:
-            synchronized(state_requesters) {
-                if(digest != null) {
-                    if(warn)
-                        log.warn("GET_DIGEST_STATE_OK: existing digest is not null, overwriting it !");
-                }
-                digest=(Digest)evt.getArg();
-                if(log.isDebugEnabled())
-                    log.debug("GET_DIGEST_STATE_OK: digest is " + digest + "\npassUp(GET_APPLSTATE)");
-                passUp(new Event(Event.GET_APPLSTATE));
-            }
-            return;
-
-        case Event.MSG:
-            msg=(Message)evt.getArg();
-            if(!(msg.getHeader(name) instanceof StateHeader))
-                break;
-
-            hdr=(StateHeader)msg.removeHeader(name);
-            switch(hdr.type) {
-            case StateHeader.STATE_REQ:
-                handleStateReq(hdr.sender);
-                break;
-            case StateHeader.STATE_RSP:
-                handleStateRsp(hdr.sender, hdr.my_digest, msg.getBuffer());
-                break;
-            default:
-                if(log.isErrorEnabled()) log.error(ExternalStrings.STATE_TRANSFER_TYPE__0__NOT_KNOWN_IN_STATEHEADER, hdr.type);
-                break;
-            }
-            return;
-        }
-        passUp(evt);
-    }
-
-
-
-    @Override // GemStoneAddition
-    public void down(Event evt) {
-        byte[] state;
-        Address target, requester;
-        StateTransferInfo info;
-        StateHeader hdr;
-        Message state_req, state_rsp;
-
-        switch(evt.getType()) {
-
-            case Event.TMP_VIEW:
-            case Event.VIEW_CHANGE:
-                handleViewChange((View)evt.getArg());
-                break;
-
-            // generated by JChannel.getState(). currently, getting the state from more than 1 mbr is not implemented
-            case Event.GET_STATE:
-                info=(StateTransferInfo)evt.getArg();
-                if(info.type != StateTransferInfo.GET_FROM_SINGLE) {
-                    if(warn) log.warn("[GET_STATE] (info=" + info + "): getting the state from " +
-                            "all members is not currently supported by pbcast.STATE_TRANSFER, will use " +
-                            "coordinator to fetch state instead");
-                }
-                if(info.target == null) {
-                    target=determineCoordinator();
-                }
-                else {
-                    target=info.target;
-                    if(target.equals(local_addr)) {
-                        if(log.isErrorEnabled()) log.error(ExternalStrings.STATE_TRANSFER_GET_STATE_CANNOT_FETCH_STATE_FROM_MYSELF_);
-                        target=null;
-                    }
-                }
-                if(target == null) {
-                    if(log.isDebugEnabled()) log.debug("GET_STATE: first member (no state)");
-                    passUp(new Event(Event.GET_STATE_OK, null));
-                }
-                else {
-                    state_req=new Message(target, null, null);
-                    state_req.putHeader(name, new StateHeader(StateHeader.STATE_REQ, local_addr, state_id++, null));
-                    if(log.isDebugEnabled()) log.debug("GET_STATE: asking " + target + " for state");
-
-                    // suspend sending and handling of mesage garbage collection gossip messages,
-                    // fixes bugs #943480 and #938584). Wake up when state has been received
-                    if(log.isDebugEnabled())
-                        log.debug("passing down a SUSPEND_STABLE event");
-                    passDown(new Event(Event.SUSPEND_STABLE, Long.valueOf(info.timeout)));
-
-                    start=System.currentTimeMillis();
-                    passDown(new Event(Event.MSG, state_req));
-                }
-                return;                 // don't pass down any further !
-
-            case Event.GET_APPLSTATE_OK:
-                state=(byte[])evt.getArg();
-                synchronized(state_requesters) {
-                    if(state_requesters.size() == 0) {
-                        if(warn)
-                            log.warn("GET_APPLSTATE_OK: received application state, but there are no requesters !");
-                        return;
-                    }
-                    if(digest == null) { // GemStoneAddition: missing braces
-                        if(warn) log.warn("GET_APPLSTATE_OK: received application state, " +
-                                "but there is no digest !");
-                    }
-                    else
-                        digest=digest.copy();
-                    if(stats) {
-                        num_state_reqs++;
-                        if(state != null)
-                            num_bytes_sent+=state.length;
-                        avg_state_size=(double)/*GemStoneAddition*/num_bytes_sent / num_state_reqs;
-                    }
-                    for(Enumeration e=state_requesters.elements(); e.hasMoreElements();) {
-                        requester=(Address)e.nextElement();
-                        state_rsp=new Message(requester, null, state); // put the state into state_rsp.buffer
-                        hdr=new StateHeader(StateHeader.STATE_RSP, local_addr, 0, digest);
-                        state_rsp.putHeader(name, hdr);
-                        passDown(new Event(Event.MSG, state_rsp));
-                    }
-                    digest=null;
-                    state_requesters.removeAll();
-                }
-                return;                 // don't pass down any further !
-        }
-
-        passDown(evt);              // pass on to the layer below us
-    }
-
-
-
-
-
-
-
-
-
-    /* --------------------------- Private Methods -------------------------------- */
-
-
-    /** Return the first element of members which is not me. Otherwise return null. */
-    private Address determineCoordinator() {
-        Address ret=null;
-        synchronized(members) {
-            if(/*members != null && GemStoneADdition (cannot be null) */ members.size() > 1) {
-                for(int i=0; i < members.size(); i++)
-                    if(!local_addr.equals(members.elementAt(i)))
-                        return (Address)members.elementAt(i);
-            }
-        }
-        return ret;
-    }
-
-
-    private void handleViewChange(View v) {
-        Vector new_members=v.getMembers();
-        synchronized(members) {
-            members.clear();
-            members.addAll(new_members);
-        }
-    }
-
-    /**
-     * If a state transfer is in progress, we don't need to send a GET_APPLSTATE event to the application, but
-     * instead we just add the sender to the requester list so it will receive the same state when done. If not,
-     * we add the sender to the requester list and send a GET_APPLSTATE event up.
-     */
-    private void handleStateReq(Object sender) {
-        if(sender == null) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.STATE_TRANSFER_SENDER_IS_NULL_);
-            return;
-        }
-
-        synchronized(state_requesters) {
-            if(state_requesters.size() > 0) {  // state transfer is in progress, digest was requested
-                state_requesters.add(sender);
-            }
-            else {
-                state_requesters.add(sender);
-                digest=null;
-                if(log.isDebugEnabled()) log.debug("passing down GET_DIGEST_STATE");
-                passDown(new Event(Event.GET_DIGEST_STATE));
-            }
-        }
-    }
-
-
-    /** Set the digest and the send the state up to the application */
-    void handleStateRsp(Object sender, Digest digest, byte[] state) {
-        if(digest == null) {
-            if(warn)
-                log.warn("digest received from " + sender + " is null, skipping setting digest !");
-        }
-        else
-            passDown(new Event(Event.SET_DIGEST, digest)); // set the digest (e.g. in NAKACK)
-        stop=System.currentTimeMillis();
-
-        // resume sending and handling of mesage garbage collection gossip messages,
-        // fixes bugs #943480 and #938584). Wakes up a previously suspended message garbage
-        // collection protocol (e.g. STABLE)
-        if(log.isDebugEnabled())
-            log.debug("passing down a RESUME_STABLE event");
-        passDown(new Event(Event.RESUME_STABLE));
-
-        if(state == null) {
-            if(warn)
-                log.warn("state received from " + sender + " is null, will return null state to application");
-        }
-        else
-            log.debug("received state, size=" + state.length + " bytes. Time=" + (stop-start) + " milliseconds");
-        passUp(new Event(Event.GET_STATE_OK, state));
-    }
-
-
-    /* ------------------------ End of Private Methods ------------------------------ */
-
-
-
-    /**
-     * Wraps data for a state request/response. Note that for a state response the actual state will <em>not</em
-     * be stored in the header itself, but in the message's buffer.
-     *
-     */
-    public static class StateHeader extends Header implements Streamable {
-        public static final byte STATE_REQ=1;
-        public static final byte STATE_RSP=2;
-
-
-        long id=0;          // state transfer ID (to separate multiple state transfers at the same time)
-        byte type=0;
-        Address sender=null;   // sender of state STATE_REQ or STATE_RSP
-        Digest my_digest=null;   // digest of sender (if type is STATE_RSP)
-
-
-        public StateHeader() {
-        } // for externalization
-
-
-        public StateHeader(byte type, Address sender, long id, Digest digest) {
-            this.type=type;
-            this.sender=sender;
-            this.id=id;
-            this.my_digest=digest;
-        }
-
-        public int getType() {
-            return type;
-        }
-
-        public Digest getDigest() {
-            return my_digest;
-        }
-
-
-        @Override // GemStoneAddition
-        public boolean equals(Object o) {
-            StateHeader other;
-
-            if(sender != null && o != null) {
-                if(!(o instanceof StateHeader))
-                    return false;
-                other=(StateHeader)o;
-                return sender.equals(other.sender) && id == other.id;
-            }
-            return false;
-        }
-
-
-        @Override // GemStoneAddition
-        public int hashCode() {
-            if(sender != null)
-                return sender.hashCode() + (int)id;
-            else
-                return (int)id;
-        }
-
-
-        @Override // GemStoneAddition
-        public String toString() {
-            StringBuffer sb=new StringBuffer();
-            sb.append("[StateHeader: type=" + type2Str(type));
-            if(sender != null) sb.append(", sender=" + sender + " id=#" + id);
-            if(my_digest != null) sb.append(", digest=" + my_digest);
-            return sb.toString();
-        }
-
-
-        static String type2Str(int t) {
-            switch(t) {
-                case STATE_REQ:
-                    return "STATE_REQ";
-                case STATE_RSP:
-                    return "STATE_RSP";
-                default:
-                    return "<unknown>";
-            }
-        }
-
-
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(sender);
-            out.writeLong(id);
-            out.writeByte(type);
-            out.writeObject(my_digest);
-        }
-
-
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            sender=(Address)in.readObject();
-            id=in.readLong();
-            type=in.readByte();
-            my_digest=(Digest)in.readObject();
-        }
-
-
-
-        public void writeTo(DataOutputStream out) throws IOException {
-            out.writeByte(type);
-            out.writeLong(id);
-            Util.writeAddress(sender, out);
-            Util.writeStreamable(my_digest, out);
-        }
-
-        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
-            type=in.readByte();
-            id=in.readLong();
-            sender=Util.readAddress(in);
-            my_digest=(Digest)Util.readStreamable(Digest.class, in);
-        }
-
-        @Override // GemStoneAddition
-        public long size(short version) {
-            long retval=Global.LONG_SIZE + Global.BYTE_SIZE; // id and type
-
-            retval+=Util.size(sender,version);
-
-            retval+=Global.BYTE_SIZE; // presence byte for my_digest
-            if(my_digest != null)
-                retval+=my_digest.serializedSize(version);
-
-            return retval;
-        }
-
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/package.html
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/package.html b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/package.html
deleted file mode 100644
index e3c9af5..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/package.html
+++ /dev/null
@@ -1,5 +0,0 @@
-<HTML>
-	<BODY>
-		Supports probabilistic broadcasts.
-	</BODY>
-</HTML>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNode.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNode.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNode.java
deleted file mode 100644
index 85e774b..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNode.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-//$Id: RingNode.java,v 1.2 2004/03/30 06:47:20 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.ring;
-
-import com.gemstone.org.jgroups.stack.IpAddress;
-
-import java.util.Vector;
-
-public interface RingNode
-{
-   Object receiveToken(int timeout) throws TokenLostException;
-
-   Object receiveToken() throws TokenLostException;
-
-   void passToken(Object token) throws TokenLostException;
-
-   IpAddress getTokenReceiverAddress();
-
-   void reconfigure(Vector newMembers);
-
-   void tokenArrived(Object token);
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNodeFlowControl.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNodeFlowControl.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNodeFlowControl.java
deleted file mode 100644
index d955acf..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNodeFlowControl.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-//$Id: RingNodeFlowControl.java,v 1.4 2005/08/08 12:45:41 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.ring;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-
-
-public class RingNodeFlowControl
-{
-   final int initialWindow;
-   final float windowReduceFactor;
-   final int belowThresholdAdvanceAmount;
-   final float aboveThresholdAdvanceAmount;
-   private int memberCount;
-   private int previousBacklog;
-   private int backlog;
-    protected final GemFireTracer log=GemFireTracer.getLog(this.getClass());
-
-   public RingNodeFlowControl(int initialWindow,
-                              float windowReduceFactor,
-                              int belowThresholdAdvanceAmount,
-                              float aboveThresholdAdvanceAmount)
-   {
-      this.initialWindow = initialWindow;
-      this.windowReduceFactor = windowReduceFactor;
-      this.belowThresholdAdvanceAmount = belowThresholdAdvanceAmount;
-      this.aboveThresholdAdvanceAmount = aboveThresholdAdvanceAmount;
-   }
-
-   public RingNodeFlowControl()
-   {
-      this(20, 0.7F, 3, 1.0F);
-   }
-
-   public void invalidate()
-   {
-      previousBacklog = backlog = 0;
-   }
-
-   public int getBacklog()
-   {
-      return backlog;
-   }
-
-   public void setBacklog(int backlog)
-   {
-      if(backlog <0)
-      throw new IllegalArgumentException("backlog value has to be positive");
-      this.backlog = backlog;
-   }
-
-   public int getBacklogDifference()
-   {
-      return backlog - previousBacklog;
-   }
-
-   public int getPreviousBacklog()
-   {
-      return previousBacklog;
-   }
-
-   public void setPreviousBacklog()
-   {
-      this.previousBacklog = backlog;
-   }
-
-   public void viewChanged(int memberCount)
-   {
-      this.memberCount = memberCount;
-   }
-
-   public int getAllowedToBroadcast(RingToken token)
-   {
-      int fairWindowShare = 0;
-      int windowSize = token.getWindowSize();
-      if (memberCount == 0) memberCount = 1;
-      int maxMessages = (windowSize / memberCount);
-      if (maxMessages < 1)
-         maxMessages = 1;
-
-      int backlogAverage = token.getBacklog() + backlog - previousBacklog;
-      if (backlogAverage > 0)
-      {
-         fairWindowShare = windowSize * backlog / backlogAverage;
-      }
-      fairWindowShare = (fairWindowShare < 1)?1: fairWindowShare;
-
-
-      int maxAllowed = windowSize - token.getLastRoundBroadcastCount();
-      if (maxAllowed < 1)
-         maxAllowed = 0;
-
-
-         if(log.isInfoEnabled()) log.info("fairWindowShare=" + fairWindowShare + " maxMessages="
-                    + maxMessages + " maxAllowed=" + maxAllowed);
-
-      return (fairWindowShare < maxAllowed)?Math.min(fairWindowShare, maxMessages):Math.min(maxAllowed, maxMessages);
-   }
-
-   public void updateWindow(RingToken token)
-   {
-      int threshold = token.getWindowThreshold();
-      int window = token.getWindowSize();
-      if (window < initialWindow)
-      {
-         window = initialWindow;
-      }
-
-      boolean congested =(token.getRetransmissionRequests().size() > 0);
-
-      if (congested)
-      {
-         threshold = (int) (window * windowReduceFactor);
-         window = initialWindow;
-      }
-      else
-      {
-         if (window < threshold)
-         {
-            window += belowThresholdAdvanceAmount;
-         }
-         else
-         {
-            window += aboveThresholdAdvanceAmount;
-         }
-      }
-      token.setWindowSize(window);
-      token.setWindowThreshold(threshold);
-   }
-
-}


Mime
View raw message