geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [29/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject
Date Fri, 21 Aug 2015 21:22:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FC.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FC.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FC.java
deleted file mode 100644
index 93210a6..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FC.java
+++ /dev/null
@@ -1,1025 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: FC.java,v 1.50 2005/10/28 14:46:49 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Vector;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Global;
-import com.gemstone.org.jgroups.Header;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.View;
-import java.util.concurrent.ConcurrentHashMap;
-//import com.gemstone.org.jgroups.protocols.pbcast.GMS;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.BoundedList;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Streamable;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
-/**
- * Simple flow control protocol based on a credit system. Each sender has a number of credits (bytes
- * to send). When the credits have been exhausted, the sender blocks. Each receiver also keeps track of
- * how many credits it has received from a sender. When credits for a sender fall below a threshold,
- * the receiver sends more credits to the sender. Works for both unicast and multicast messages.
- * <p>
- * Note that this protocol must be located towards the top of the stack, or all down_threads from JChannel to this
- * protocol must be set to false ! This is in order to block JChannel.send()/JChannel.down().
- * <br/>This is the second simplified implementation of the same model. The algorithm is sketched out in
- * doc/FlowControl.txt
- * @author Bela Ban
- * @version $Revision: 1.50 $
- */
-public class FC extends Protocol  {
-  
-    /** HashMap key=Address, value=Long: keys are members, values are credits left. For each send, the
-     * number of credits is decremented by the message size */
-    final Map sent=new HashMap(11);
-    // final Map sent=new ConcurrentHashMap(11);
-
-    /* Throttle request from the receivers; key receivers, values throttle(sleep) time */
-    final Map throttle=new HashMap(11);
-
-    /** HashMap key=Address, value=Long: keys are members, values are credits left (in bytes).
-     * For each receive, the credits for the sender are decremented by the size of the received message.
-     * When the credits are 0, we refill and send a CREDIT message to the sender. Sender blocks until CREDIT
-     * is received after reaching <tt>min_credits</tt> credits. */
-    final Map received=new ConcurrentHashMap(11);
-    // final Map received=new ConcurrentHashMap(11);
-
-
-    /** List of members from whom we expect credits */
-    final List creditors=new ArrayList(11);
-
-    /** Max number of bytes to send per receiver until an ack must
-     * be received before continuing sending */
-    private long max_credits=50000;
-    private Long max_credits_constant;
-
-    /** Max time (in milliseconds) to block. If credit hasn't been received after max_block_time, we send
-     * a REPLENISHMENT request to the members from which we expect credits. A value <= 0 means to
-     * wait forever.
-     */
-    private long max_block_time=5000;
-
-    /** If credits fall below this limit, we send more credits to the sender. (We also send when
-     * credits are exhausted (0 credits left)) */
-    private double min_threshold=0.25;
-
-    /** Computed as <tt>max_credits</tt> times <tt>min_theshold</tt>. If explicitly set, this will
-     * override the above computation */
-    private long min_credits;
-
-    /** Whether FC is still running, this is set to false when the protocol terminates (on stop()) */
-    private boolean running=true;
-
-    /** Determines whether or not to block on down(). Set when not enough credit is available to send a message
-     * to all or a single member */
-    private boolean insufficient_credit;
-
-    /** the lowest credits of any destination (sent_msgs) */
-    private long lowest_credit=max_credits;
-
-    /** Mutex to block on down() */
-    volatile Object mutex = new Object();
-
-    static final String name="FC";
-
-//    private long start_blocking;
-
-    private int num_blockings;
-    private int num_credit_requests_received, num_credit_requests_sent;
-    private int num_credit_responses_sent, num_credit_responses_received;
-    private long total_time_blocking;
-
-    private long mcast_throttle_time;
-    private Object mcast_throttle_mutex = new Object();
-    
-    final BoundedList last_blockings=new BoundedList(50);
-    
-    final Map throttledSenders = new HashMap();
-    
-    
-//    Address coordinator; // current view coordinator GemStoneAddition (omitted)
-    
-//    private Address local_addr= null; // GemStoneAddition
-
-//    final static FcHeader REPLENISH_HDR=new FcHeader(FcHeader.REPLENISH);
-//    final static FcHeader CREDIT_REQUEST_HDR=new FcHeader(FcHeader.CREDIT_REQUEST);
-
-    @Override // GemStoneAddition
-    public final String getName() {
-        return name;
-    }
-
-    // start GemStoneAddition
-    @Override // GemStoneAddition
-    public int getProtocolEnum() {
-      return com.gemstone.org.jgroups.stack.Protocol.enumFC;
-    }
-    // end GemStone addition
-
-
-    @Override // GemStoneAddition
-    public void resetStats() {
-        super.resetStats();
-        num_blockings=0;
-        num_credit_responses_sent=num_credit_responses_received=num_credit_requests_received=num_credit_requests_sent=0;
-        total_time_blocking=0;
-        last_blockings.removeAll();
-    }
-
-    public long getMaxCredits() {
-        return max_credits;
-    }
-
-    public void setMaxCredits(long max_credits) {
-        this.max_credits=max_credits;
-        max_credits_constant=Long.valueOf(this.max_credits);
-    }
-
-    public double getMinThreshold() {
-        return min_threshold;
-    }
-
-    public void setMinThreshold(double min_threshold) {
-        this.min_threshold=min_threshold;
-    }
-
-    public long getMinCredits() {
-        return min_credits;
-    }
-
-    public void setMinCredits(long min_credits) {
-        this.min_credits=min_credits;
-    }
-
-    public boolean isBlocked() {
-        return insufficient_credit;
-    }
-
-    public int getNumberOfBlockings() {
-        return num_blockings;
-    }
-
-    public long getMaxBlockTime() {
-        return max_block_time;
-    }
-
-    public void setMaxBlockTime(long t) {
-        max_block_time=t;
-    }
-
-    public long getTotalTimeBlocked() {
-        return total_time_blocking;
-    }
-
-    public double getAverageTimeBlocked() {
-        return num_blockings == 0? 0.0 : total_time_blocking / (double)num_blockings;
-    }
-
-    public int getNumberOfCreditRequestsReceived() {
-        return num_credit_requests_received;
-    }
-
-    public int getNumberOfCreditRequestsSent() {
-        return num_credit_requests_sent;
-    }
-
-    public int getNumberOfCreditResponsesReceived() {
-        return num_credit_responses_received;
-    }
-
-    public int getNumberOfCreditResponsesSent() {
-        return num_credit_responses_sent;
-    }
-
-    public String printSenderCredits() {
-        return printMap(sent);
-    }
-
-    public String printReceiverCredits() {
-        return printMap(received);
-    }
-
-    public String printCredits() {
-        StringBuffer sb=new StringBuffer();
-        sb.append("senders:\n").append(printMap(sent)).append("\n\nreceivers:\n").append(printMap(received));
-        return sb.toString();
-    }
-
-    @Override // GemStoneAddition
-    public Map dumpStats() {
-        Map retval=super.dumpStats();
-        if(retval == null)
-            retval=new HashMap();
-        retval.put("senders", printMap(sent));
-        retval.put("receivers", printMap(received));
-        retval.put("num_blockings", Integer.valueOf(this.num_blockings));
-        retval.put("avg_time_blocked", Double.valueOf(getAverageTimeBlocked()));
-        retval.put("num_replenishments", Integer.valueOf(this.num_credit_responses_received));
-        retval.put("total_time_blocked", Long.valueOf(total_time_blocking));
-        return retval;
-    }
-
-    public String showLastBlockingTimes() {
-        return last_blockings.toString();
-    }
-
-
-
-    /** Allows to unblock a blocked sender from an external program, e.g. JMX */
-    @SuppressFBWarnings(value="IL_INFINITE_RECURSIVE_LOOP", justification="the code is correct")
-    public void unblock() {
-      Object mux = mutex;
-        synchronized(mux) {
-          if (mutex != mux) { // GemFire bug 40243
-            unblock();
-            return;
-          }
-            if(trace)
-                log.trace("unblocking the sender and replenishing all members, creditors are " + creditors);
-
-            Map.Entry entry;
-            for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {
-                entry=(Map.Entry)it.next();
-                entry.setValue(max_credits_constant);
-            }
-
-            lowest_credit=computeLowestCredit(sent);
-            creditors.clear();
-            insufficient_credit=false;
-            mux.notifyAll();
-        }
-    }
-
-
-
-    @Override // GemStoneAddition
-    public boolean setProperties(Properties props) {
-        String  str;
-        boolean min_credits_set=false;
-
-        super.setProperties(props);
-        str=props.getProperty("max_credits");
-        if(str != null) {
-            max_credits=Long.parseLong(str);
-            props.remove("max_credits");
-        }
-
-        str=props.getProperty("min_threshold");
-        if(str != null) {
-            min_threshold=Double.parseDouble(str);
-            props.remove("min_threshold");
-        }
-
-        str=props.getProperty("min_credits");
-        if(str != null) {
-            min_credits=Long.parseLong(str);
-            props.remove("min_credits");
-            min_credits_set=true;
-        }
-
-        if(!min_credits_set)
-            min_credits=(long)(/*(double) GemStoneAddition */max_credits * min_threshold);
-
-        str=props.getProperty("max_block_time");
-        if(str != null) {
-            max_block_time=Long.parseLong(str);
-            props.remove("max_block_time");
-        }
-
-        if(props.size() > 0) {
-            log.error(ExternalStrings.FC_FCSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-            return false;
-        }
-        max_credits_constant=Long.valueOf(max_credits);
-        return true;
-    }
-
-    @Override // GemStoneAddition
-    public void start() throws Exception {
-        super.start();
-        Object mux = mutex;
-        synchronized(mux) {
-          if (mutex != mux) { // GemFire bug 40243
-            start();
-            return;
-          }
-            running=true;
-            insufficient_credit=false;
-            lowest_credit=max_credits;
-        }
-    }
-
-    @Override // GemStoneAddition
-    public void stop() {
-        super.stop();
-        Object mux = mutex;
-        synchronized(mux) {
-          if (mutex != mux) { // GemFire bug 40243
-            stop();
-            return;
-          }
-            running=false;
-            mutex.notifyAll();
-        }
-    }
-
-
-    /**
-     * We need to receive view changes concurrent to messages on the down events: a message might blocks, e.g.
-     * because we don't have enough credits to send to member P. However, if member P crashed, we need to unblock !
-     * @param evt
-     */
-//    protected void receiveDownEvent(Event evt) {
-//        if(evt.getType() == Event.VIEW_CHANGE) {
-//            View v=(View)evt.getArg();
-//            Vector mbrs=v.getMembers();
-//            handleViewChange(mbrs);
-//        }
-//        super.receiveDownEvent(evt);
-//    }
-
-    @Override // GemStoneAddition
-    public void down(Event evt) {
-        switch(evt.getType()) {
-        case Event.MSG:
-            handleDownMessage(evt);
-            return;
-        case Event.VIEW_CHANGE: // GemStoneAddition - moved from receiveDownEvent
-          View v=(View)evt.getArg();
-          Vector mbrs=v.getMembers();
-          handleViewChange(mbrs);
-          break;
-        }
-        passDown(evt); // this could potentially use the lower protocol's thread which may block
-    }
-
-
-
-
-    @Override // GemStoneAddition
-    public void up(Event evt) {
-        switch(evt.getType()) {
-
-//            case Event.SET_LOCAL_ADDRESS: // GemStoneAddition
-//              local_addr = (Address)evt.getArg();
-//              break;
-
-            case Event.MSG:
-                Message msg=(Message)evt.getArg();
-                FcHeader hdr=(FcHeader)msg.removeHeader(name);
-                if(hdr != null) {
-                    Address sender=msg.getSrc();
-                    switch(hdr.type) {
-                    case FcHeader.REPLENISH:
-                      if (log.isTraceEnabled())
-                        log.trace("(FC) received REPLENISH from "+sender+" with " + hdr.getBalance());
-                        num_credit_responses_received++;
-                        handleCredit(msg.getSrc(), hdr.getBalance()); // GemStoneAddition
-                        break;
-                    case FcHeader.THROTTLE:
-                      if (log.isTraceEnabled())
-                        log.trace("(FC) received THROTTLE request from "+sender+" with " + hdr.getBalance());
-                      handleThrottleRequest(msg.getSrc(), hdr.getBalance()); // GemStoneAddition
-                      break;
-                    case FcHeader.WAIT: // GemStoneAddition
-                      if (log.isTraceEnabled())
-                        log.trace("(FC) received WAIT request from "+sender+". Responding with credit request");
-                        sendCreditRequest(msg.getSrc());
-                        break;
-                    case FcHeader.CREDIT_REQUEST:
-                        /*
-                        if (stack.jgmm.isSerialQueueThrottled(sender)) { // GemStoneAddition
-                          sendNoCredit(sender);
-                          stack.gemfireStats.incJg3(1);
-                          break;
-                        }*/
-                        
-                        num_credit_requests_received++;
-                        
-                        long balance = ((Long)received.get(sender)).longValue(); // GemStoneAddition
-                        received.put(sender, max_credits_constant);
-                        if (log.isTraceEnabled())
-                          log.trace("(FC) received credit request from "+sender+": sending "+(max_credits-balance)+" credits");
-                        sendCredit(sender, max_credits - balance);
-                        stack.gfPeerFunctions.incFlowControlResponses(); // GemStoneAddition
-                        break;
-                    default:
-                        log.error(ExternalStrings.FC_HEADER_TYPE__0__NOT_KNOWN, hdr.type);
-                        break;
-                    }
-                    return; // don't pass message up
-                }
-                else {
-                    adjustCredit(msg);
-                }
-
-                break;
-
-        case Event.VIEW_CHANGE:
-            View newView = (View)evt.getArg();
-//            this.coordinator = newView.getCreator(); GemStoneAddition
-            handleViewChange(newView.getMembers());
-            break;
-        }
-        passUp(evt);
-    }
-
-
-    @SuppressFBWarnings(value="IMSE_DONT_CATCH_IMSE", justification="the code is for a hotspot bug")
-    private void handleDownMessage(Event evt) {
-      boolean requestSent = false;
-        Message msg=(Message)evt.getArg();
-        if (msg.isHighPriority || Thread.currentThread().getName().startsWith("UDP")) {
-          passDown(evt);
-          return;
-        }
-        int     length=msg.getLength();
-        Address dest=msg.getDest();
-        long blockStartTime = 0; // GemStoneAddition - statistics
-        
-        // See if there was a throttle request from the receiever.
-        throttleOnReceiver(dest); // GemStoneAddition
-        try {
-          Object mux = mutex;
-          synchronized(mux) {
-            if (mux != mutex) { // GemFire bug 40243 hit in another thread
-              passDown(evt);
-              return;
-            }
-            if(lowest_credit <= length) {
-              stack.gfPeerFunctions.incJgFCsendBlocks(1);
-              determineCreditors(dest, length);
-              insufficient_credit=true;
-              num_blockings++;
-              blockStartTime = stack.gfPeerFunctions.startFlowControlWait(); // GemStoneAddition - statistics
-              long start_blocking=System.currentTimeMillis();
-              boolean warned = false;
-              boolean shunned = false;
-              while(insufficient_credit && running
-                  && creditors.size() > 0  /* GemStoneAddition */) {
-                try {mux.wait(max_block_time);} catch(InterruptedException e) {
-                  Thread.currentThread().interrupt(); // GemStoneAddition
-                  break; //  GemStoneAddition
-                }
-                if(insufficient_credit && running) {
-                  stack.gfPeerFunctions.incJgFCautoRequests(1);
-                  int secs = stack.gfPeerFunctions.getAckWaitThreshold();
-                  long elapsed =System.currentTimeMillis()-start_blocking;
-                  if (elapsed > secs * 1000) {
-                    if (!warned) {
-                      warned = true;
-                      log.getLogWriter().warning(
-                          ExternalStrings.FC_FLOW_CONTROL_HAS_BLOCKED_FOR_MORE_THAN_0_SECONDS_WAITING_FOR_REPLENISHMENT_FROM_1,
-                          new Object[] {Long.valueOf(elapsed/1000), creditors});
-                    }
-                    else {
-                      secs = stack.gfPeerFunctions.getAckSevereAlertThreshold();
-                      if (secs > 0 && !shunned && elapsed > secs * 1000) {
-                        shunned = true;
-                        //warned = false; // allow another warning so we can see if shunning worked
-                        /*for (Iterator it=creditors.iterator(); it.hasNext(); ) {
-                                Address badmbr = (Address)it.next();
-                                Message shun = new Message();
-                                shun.setDest(this.coordinator);
-                                GMS gms = (GMS)stack.findProtocol("GMS");
-                                shun.putHeader(gms.getName(), new GMS.GmsHeader(
-                                    GMS.GmsHeader.REMOVE_REQ, badmbr));
-                                passDown(new Event(Event.MSG, shun));
-                              }*/
-                      }
-                    }
-                  }
-                  if (!requestSent) { // UNICAST will retransmit if someone doesn't get the message
-                    requestSent = true;
-                    for(int i=0; i < creditors.size(); i++) {
-                      sendCreditRequest((Address)creditors.get(i));
-                    }
-                  }
-                }
-              }
-              if (warned) {
-                log.getLogWriter().warning(
-                    ExternalStrings.FC_FLOW_CONTROL_WAS_UNBLOCKED_AFTER_WAITING_0_SECONDS,
-                    Long.valueOf((System.currentTimeMillis()-start_blocking)/1000));
-              }
-              stack.gfPeerFunctions.endFlowControlWait(blockStartTime);
-              //stop_blocking=System.currentTimeMillis();
-              //long block_time=stop_blocking - start_blocking;
-              //if(trace)
-              //    log.trace("total time blocked: " + block_time + " ms");
-              //total_time_blocking+=block_time;
-              //last_blockings.add(Long.valueOf(block_time));
-            }
-            else {
-              long tmp=decrementCredit(sent, dest, length);
-              if(tmp != -1)
-                lowest_credit=Math.min(tmp, lowest_credit);
-            }
-          }
-        } catch (IllegalMonitorStateException e) {
-          // GemFire bug 40243 is a problem with the hotspot compiler corrupting the
-          // lock of an object.  In this case, it's the mutex object and we
-          // can replace it & have another go
-          mutex = new Object();
-        }
-
-        // send message - either after regular processing, or after blocking (when enough credits available again)
-        passDown(evt);
-        
-    }
-
-    /**
-     * Checks whether one member (unicast msg) or all members (multicast msg) have enough credits. Add those
-     * that don't to the creditors list
-     * @param dest
-     * @param length
-     */
-    private void determineCreditors(Address dest, int length) {
-        boolean multicast=dest == null || dest.isMulticastAddress();
-        Address mbr;
-        Long    credits;
-        if(multicast) {
-            Map.Entry entry;
-            for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {
-                entry=(Map.Entry)it.next();
-                mbr=(Address)entry.getKey();
-                credits=(Long)entry.getValue();
-                if(credits.longValue() <= length) {
-                    if(!creditors.contains(mbr))
-                        creditors.add(mbr);
-                }
-            }
-        }
-        else {
-            credits=(Long)sent.get(dest);
-            if(credits != null && credits.longValue() <= length) {
-                if(!creditors.contains(dest))
-                    creditors.add(dest);
-            }
-        }
-    }
-
-
-    /**
-     * Decrements credits from a single member, or all members in sent_msgs, depending on whether it is a multicast
-     * or unicast message. No need to acquire mutex (must already be held when this method is called)
-     * @param dest
-     * @param credits
-     * @return The lowest number of credits left, or -1 if a unicast member was not found
-     */
-    private long decrementCredit(Map m, Address dest, long credits) {
-        boolean multicast=dest == null || dest.isMulticastAddress();
-        long    lowest=max_credits, tmp;
-        Long    val;
-
-        if(multicast) {
-            if(m.size() == 0)
-                return -1;
-            Map.Entry entry;
-            for(Iterator it=m.entrySet().iterator(); it.hasNext();) {
-                entry=(Map.Entry)it.next();
-                val=(Long)entry.getValue();
-                tmp=val.longValue();
-                tmp-=credits;
-                entry.setValue(Long.valueOf(tmp));
-                lowest=Math.min(tmp, lowest);
-            }
-            return lowest;
-        }
-        else {
-            val=(Long)m.get(dest);
-            if(val != null) {
-                lowest=val.longValue();
-                lowest-=credits;
-                m.put(dest, Long.valueOf(lowest));
-                return lowest;
-            }
-        }
-        return -1;
-    }
-
-    /**
-     * Handle throttle message from a peer(receiver)
-     * 
-     * @param sender
-     * @param balance thrrottle(sleep) time on this receiver. 
-     */
-    private void handleThrottleRequest(Address sender, long balance) { // GemStoneAddition
-        if(sender == null) {return;}       
-        synchronized(throttle) {
-          // The individual throttling used for p2p send.
-          throttle.put(sender, Long.valueOf(balance));
-          // Used for mcast send, it uses the max throttle time between the
-          // sender. Once we implement the slow-receiver handling capability
-          // we need to change the following code.
-          if (balance > mcast_throttle_time) {
-            mcast_throttle_time = balance;
-          }
-        }
-    }
-    
-    /**
-     * Throttles on the receiver. //GemStoneAddition
-     * @param dest
-     */
-    private void throttleOnReceiver(Address dest) {
-      
-      boolean multicast=dest == null || dest.isMulticastAddress();
-      
-      int sleep = 0;
-      Long p2p_throttle = null;
-      long blockStartTime = 0; 
-      //if (stack.enableClockStats)
-      
-      // Possible multithread (multi-sender) casses:
-      // 1. multiple mcast threads.
-      // 2. multiple p2p threads
-      // 3. While mcast thread throttling, p2p thread arrives.
-      // 4. While p2p thread throttling, mcast thread arrives.
-            
-      if (multicast){
-        synchronized(throttle) {
-          if (mcast_throttle_time <= 0) {return;}    
-          sleep = (int)mcast_throttle_time;
-          mcast_throttle_time = 0;
-        }
-        
-        // block other mcast sender-threads.
-        synchronized(mcast_throttle_mutex) {
-          if (trace) log.trace("### throttling for mcast" + sleep);
-          blockStartTime = stack.gfPeerFunctions.startFlowControlThrottleWait();
-          throttleSleep(sleep);
-          stack.gfPeerFunctions.endFlowControlThrottleWait(blockStartTime);
-        }
-        
-        throttle.clear();
-      }
-      else {
-
-        p2p_throttle = ((Long)throttle.get(dest));
-        if (p2p_throttle != null) {
-          // Throttle other p2p threads on this sender.
-          synchronized (p2p_throttle){
-            blockStartTime = stack.gfPeerFunctions.startFlowControlThrottleWait();
-            sleep = p2p_throttle.intValue();
-            throttleSleep(sleep);
-            stack.gfPeerFunctions.endFlowControlThrottleWait(blockStartTime);
-          }
-        
-          throttle.remove(dest);
-        
-          // During this time if there is no mcast sender, decrement this sleep
-          // time from the mcast throttle time.
-          if (mcast_throttle_time > 0){
-            mcast_throttle_time -= sleep;
-          }
-        }
-      }
-    }
-    
-    /**
-     * Throttles on the receiver. // GemStoneAddition
-     * @param sleep
-     */
-    private void throttleSleep(int sleep) {
-      try {          
-        Thread.sleep(sleep);
-      }
-      catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        // propagated the bit.  Let the caller deal with it.
-      }      
-    }
-
-    /**
-     * Handle replenish message from a peer
-     * 
-     * @param sender
-     * @param balance amount the sender allows us to replenish by
-     */
-    @SuppressFBWarnings(value="IL_INFINITE_RECURSIVE_LOOP", justification="the code is correct")
-    private void handleCredit(Address sender, long balance) { // GemStoneAddition
-        if(sender == null) return;
-        StringBuffer sb=null;
-
-        Object mux = mutex;
-        synchronized(mutex) {
-          if (mutex != mux) { // GemFire bug 40243
-            handleCredit(sender, balance);
-            return;
-          }
-            Long old_entry =(Long)sent.get(sender);// GemStoneAddition
-            if (old_entry == null) {
-              // we don't know about this person.  How can this happen?
-              // My best guess is that since our messages aren't totally ordered,
-              // the peer has departed, but his replenishment request has already
-              // been received.
-              //
-              // If this is indeed what is happening, it's safe to
-              // just ignore this.  2006-05-22
-              return;
-            }
-            long old_credit=old_entry.longValue(); // GemStoneAddition
-            stack.gfPeerFunctions.incJgFCreplenish(1); // GemStoneAddition
-            if(trace) {
-                sb=new StringBuffer();
-                sb.append("received credit <" + balance + "> from ").append(sender).append(", old credit was ").
-                        append(old_credit).append(", new credits are ").append(max_credits).
-                        append(".\nCreditors before are: ").append(creditors);
-            }
-
-            // GemStoneAddition
-            old_credit += balance;
-            if (old_credit > max_credits) {
-              old_credit = max_credits;
-            }
-            sent.put(sender, Long.valueOf(old_credit)); // replenish
-            
-            // this sender is no longer a creditor.
-            if(creditors.size() > 0) {  // we are blocked because we expect credit from one or more members
-                creditors.remove(sender);
-                if(trace) {
-                    sb.append("\nCreditors after removal of ").append(sender).append(" are: ").append(creditors);
-                    log.trace(sb.toString());
-                }
-            }
-
-            lowest_credit=computeLowestCredit(sent);
-            if(insufficient_credit && lowest_credit > 0 && creditors.size() == 0) {
-                insufficient_credit=false;
-                mutex.notifyAll();
-                stack.gfPeerFunctions.incJgFCresumes(1);
-            }
-        }
-    }
-
-    private long computeLowestCredit(Map m) {
-        Collection credits=m.values(); // List of Longs (credits)
-        Long retval=(Long)Collections.min(credits);
-        return retval.longValue();
-    }
-
-
-    /**
-     * Check whether sender has enough credits left. If not, send him some more
-     * @param msg
-     */
-    private void adjustCredit(Message msg) {
-        Address src=msg.getSrc();
-        long    length=msg.getLength(); // we don't care about headers for the purpose of flow control
-
-        if(src == null) {
-            if(log.isErrorEnabled()) log.error(ExternalStrings.FC_SRC_IS_NULL);
-            return;
-        }
-
-        if(length == 0)
-            return; // no effect
-
-        // GemStoneAddition
-        // We have a unicast address here, so determine the balance for
-        // this particular sender
-        long balance = decrementCredit(received, src, length);
-        
-        int throttleTime = 0;
-        if (msg.isCacheOperation) {
-          boolean check = (msg.getDest() == null || msg.getDest().isMulticastAddress());
-          check = check || stack.gfPeerFunctions.getDisableTcp();
-          if (check) {
-            throttleTime = stack.gfPeerFunctions.getSerialQueueThrottleTime(src);
-          }
-        }
-
-        if (throttleTime > 0) {  // GemStoneAddition throttling
-          boolean sendMsg = false;
-          synchronized(throttledSenders) {
-            Long throttleStart = (Long)throttledSenders.get(src);
-            // only send throttles once per second to avoid bombarding the other process
-            long now = System.currentTimeMillis();
-            if (throttleStart == null || (now-throttleStart) > 1000) {
-              throttledSenders.put(src, Long.valueOf(now));
-              sendMsg = true;
-            }
-          }
-          if (sendMsg) {
-            sendThrottleRequest(src, throttleTime);
-          }
-        }        
-
-        else if(balance <= min_credits) {
-          synchronized(throttledSenders) { // GemStoneAddition - throttling
-            throttledSenders.remove(src);
-          }
-            received.put(src, max_credits_constant);
-            if(trace) log.trace("sending replenishment message to " + src + " for " + msg);
-            sendCredit(src, max_credits - balance); // GemStoneAddition
-        }
-    }
-
-    /**
-     * Send a throttle message to a peer (sender)
-     * If the serial queue (executor) is unable to keep-up with incoming message
-     * a throttle request is sent to the sender, so that the messages are not 
-     * lived in the serial-queued executor for long (added as part of bugfix:35268).
-     * @param dest
-     * @param throttleTime - the time to throttle(sleep) GemStoneAddition
-     */
-    private void sendThrottleRequest(Address dest, long throttleTime) {
-        stack.gfPeerFunctions.incJgFCsentThrottleRequests(1); // GemStoneAddition
-
-        Message  msg=new Message(dest, null, null);
-        msg.putHeader(name, new FcHeader(FcHeader.THROTTLE, throttleTime));
-        msg.isHighPriority = true;
-        passDown(new Event(Event.MSG, msg));   
-    }
-
-
-    /**
-     * Send a replenish message to a peer
-     * 
-     * @param dest
-     * @param credit - the amount of credit to send GemStoneAddition
-     */
-    private void sendCredit(Address dest, long credit) {
-        stack.gfPeerFunctions.incJgFCsentCredits(1); // GemStoneAddition
-        Message  msg=new Message(dest, null, null);
-        msg.putHeader(name, new FcHeader(FcHeader.REPLENISH, credit));
-        msg.isHighPriority = true;
-        passDown(new Event(Event.MSG, msg));
-        num_credit_responses_sent++;
-    }
-
-//    /** GemStoneAddition - send a message telling other process that it's not getting more credits
-//        right now */
-//    private void sendNoCredit(Address dest) {
-//        Message  msg=new Message(dest, null, null);
-//        msg.putHeader(name, new FcHeader(FcHeader.WAIT, 0));
-//        msg.bundleable = false;
-//        passDown(new Event(Event.MSG, msg));
-//    }
-
-    private void sendCreditRequest(final Address dest) {
-        Message  msg=new Message(dest, null, null);
-        
-        // No balance sent on a credit request.  Perhaps we should???
-        msg.putHeader(name, new FcHeader(FcHeader.CREDIT_REQUEST, 0)); // GemStoneAddition
-        
-        msg.isHighPriority = true;
-        passDown(new Event(Event.MSG, msg));
-        num_credit_requests_sent++;
-        stack.gfPeerFunctions.incFlowControlRequests(); // GemStoneAddition
-    }
-
-
-    @SuppressFBWarnings(value="IL_INFINITE_RECURSIVE_LOOP", justification="the code is correct")
-    private void handleViewChange(Vector mbrs) {
-        Address addr;
-        if(mbrs == null) return;
-        //if(trace) log.trace("new membership: " + mbrs);
-
-        Object mux = mutex;
-        synchronized(mutex) {
-          if (mutex != mux) { // GemFire bug 40243
-            handleViewChange(mbrs);
-            return;
-          }
-            // add members not in membership to received and sent hashmap (with full credits)
-            for(int i=0; i < mbrs.size(); i++) {
-                addr=(Address) mbrs.elementAt(i);
-                if(!received.containsKey(addr))
-                    received.put(addr, max_credits_constant);
-                if(!sent.containsKey(addr))
-                    sent.put(addr, max_credits_constant);
-            }
-            // remove members that left
-            for(Iterator it=received.keySet().iterator(); it.hasNext();) {
-                addr=(Address) it.next();
-                if(!mbrs.contains(addr))
-                    it.remove();
-            }
-
-            // remove members that left
-            for(Iterator it=sent.keySet().iterator(); it.hasNext();) {
-                addr=(Address)it.next();
-                if(!mbrs.contains(addr))
-                    it.remove(); // modified the underlying map
-            }
-
-            // remove all creditors which are not in the new view
-            for(int i=0; i < creditors.size(); i++) {
-                Address creditor=(Address)creditors.get(i);
-                if(!mbrs.contains(creditor))
-                    creditors.remove(creditor);
-            }
-
-            if(trace) log.trace("creditors are " + creditors);
-            if(insufficient_credit && creditors.size() == 0) {
-                lowest_credit=computeLowestCredit(sent);
-                insufficient_credit=false;
-                mutex.notifyAll();
-            }
-        }
-    }
-
-    private static String printMap(Map m) {
-        Map.Entry entry;
-        StringBuffer sb=new StringBuffer();
-        for(Iterator it=m.entrySet().iterator(); it.hasNext();) {
-            entry=(Map.Entry)it.next();
-            sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
-        }
-        return sb.toString();
-    }
-
-
-
-
-
-    public static class FcHeader extends Header implements Streamable {
-        public static final byte REPLENISH      = 1;
-        public static final byte CREDIT_REQUEST = 2; // the sender of the message is the requester
-        public static final byte WAIT = 3; // GemStoneAddition
-        public static final byte THROTTLE      = 4;
-        // GemStoneAddition
-        /**
-         * In the event of a REPLENISH request, the balance is the number of
-         * credits that we permit the sender to actually put to his account.
-         */
-        long balance;
-        
-        byte  type = REPLENISH;
-
-        public FcHeader() {
-
-        }
-
-        public FcHeader(byte type, long balance) {
-            this.type=type;
-            this.balance = balance; // GemStoneAddition
-        }
-
-        public long getBalance() { return balance; } // GemStoneAddition
-        
-        @Override // GemStoneAddition
-        public long size(short version) {
-            return Global.BYTE_SIZE + Global.LONG_SIZE; // GemStoneAddition
-        }
-
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeByte(type);
-            out.writeLong(balance); // GemStoneAddition
-        }
-
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            type=in.readByte();
-            balance=in.readLong(); // GemStoneAddition
-        }
-
-        public void writeTo(DataOutputStream out) throws IOException {
-            out.writeByte(type);
-            out.writeLong(balance); // GemStoneAddition
-        }
-
-        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
-            type=in.readByte();
-            balance=in.readLong(); // GemStoneAddition
-        }
-
-        @Override // GemStoneAddition
-        public String toString() {
-            switch(type) {
-            case REPLENISH: return "REPLENISH";
-            case CREDIT_REQUEST: return "CREDIT_REQUEST";
-            case WAIT: return "WAIT";
-            case THROTTLE: return "THROTTLE";
-            default: return "<invalid type>";
-            }
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD.java
deleted file mode 100644
index 27333d2..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD.java
+++ /dev/null
@@ -1,1100 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: FD.java,v 1.31 2005/12/16 15:34:13 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import java.util.concurrent.CopyOnWriteArrayList;
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.List;
-
-
-/**
- * Failure detection based on simple heartbeat protocol. Regularly polls members for
- * liveness. Multicasts SUSPECT messages when a member is not reachable. The simple
- * algorithms works as follows: the membership is known and ordered. Each HB protocol
- * periodically sends an 'are-you-alive' message to its *neighbor*. A neighbor is the next in
- * rank in the membership list, which is recomputed upon a view change. When a response hasn't
- * been received for n milliseconds and m tries, the corresponding member is suspected (and
- * eventually excluded if faulty).<p>
- * FD starts when it detects (in a view change notification) that there are at least
- * 2 members in the group. It stops running when the membership drops below 2.<p>
- * When a message is received from the monitored neighbor member, it causes the pinger thread to
- * 'skip' sending the next are-you-alive message. Thus, traffic is reduced.<p>
- * When we receive a ping from a member that's not in the membership list, we shun it by sending it a
- * NOT_MEMBER message. That member will then leave the group (and possibly rejoin). This is only done if
- * <code>shun</code> is true.
- * @author Bela Ban
- * @version $Revision: 1.31 $
- */
-public class FD extends Protocol  {
-    volatile Address               ping_dest=null;  // GemStoneAddition - volatile
-    Address               local_addr=null;
-    long                  timeout=3000;  // number of millisecs to wait for an are-you-alive msg
-    volatile long                  last_ack=System.currentTimeMillis(); // GemStoneAddition - volatile
-    volatile int                   num_tries=0; // GemStoneAddition - volatile
-    int                   max_tries=2;   // number of times to send a are-you-alive msg (tot time= max_tries*timeout)
-    final List            members=new CopyOnWriteArrayList();
-    Address coordinator; // GemStoneAddition
-    final Hashtable       invalid_pingers=new Hashtable(7);  // keys=Address, val=Integer (number of pings from suspected mbrs)
-
-    /** Members from which we select ping_dest. may be subset of {@link #members} */
-    final List            pingable_mbrs=new CopyOnWriteArrayList();
-
-    boolean               shun=true;
-    TimeScheduler         timer=null;
-    Monitor               monitor=null;  // task that performs the actual monitoring for failure detection
-    private final Object  monitor_mutex=new Object();
-    protected/*GemStoneAddition*/ int           num_heartbeats=0;
-    protected/*GemStoneAddition*/ int           num_suspect_events=0;
-
-    /** Transmits SUSPECT message until view change or UNSUSPECT is received */
-    final Broadcaster     bcast_task=new Broadcaster();
-    final static String   name="FD";
-
-    BoundedList           suspect_history=new BoundedList(20);
-
-
-    /** GemStoneAddition active heartbeat_ack sender task */
-    HeartbeatSender     hbsender = null;
-    
-    boolean beingSick; // GemStoneAddition - test hook
-    static boolean DISABLED = Boolean.getBoolean("gemfire.DISABLE_FD"); // GemStoneAddition
-
-
-
-    @Override // GemStoneAddition  
-    public String getName() {return name;}
-    public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
-    public String getMembers() {return members != null? members.toString() : "null";}
-    public String getPingableMembers() {return pingable_mbrs != null? pingable_mbrs.toString() : "null";}
-    public String getPingDest() {return ping_dest != null? ping_dest.toString() : "null";}
-    public int getNumberOfHeartbeatsSent() {return num_heartbeats;}
-    public int getNumSuspectEventsGenerated() {return num_suspect_events;}
-    public long getTimeout() {return timeout;}
-    public void setTimeout(long timeout) {this.timeout=timeout;}
-    public int getMaxTries() {return max_tries;}
-    public void setMaxTries(int max_tries) {this.max_tries=max_tries;}
-    public int getCurrentNumTries() {return num_tries;}
-    public boolean isShun() {return shun;}
-    public void setShun(boolean flag) {this.shun=flag;}
-    public String printSuspectHistory() {
-        StringBuffer sb=new StringBuffer();
-        for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) {
-            sb.append(new Date()).append(": ").append(en.nextElement()).append("\n");
-        }
-        return sb.toString();
-    }
-
-
-    @Override // GemStoneAddition  
-    public boolean setProperties(Properties props) {
-        String str;
-
-        super.setProperties(props);
-        str=props.getProperty("timeout");
-        if(str != null) {
-            timeout=Long.parseLong(str);
-            props.remove("timeout");
-        }
-
-        str=props.getProperty("max_tries");  // before suspecting a member
-        if(str != null) {
-            max_tries=Integer.parseInt(str);
-            props.remove("max_tries");
-        }
-
-        str=props.getProperty("shun");
-        if(str != null) {
-            shun=Boolean.valueOf(str).booleanValue();
-            props.remove("shun");
-        }
-
-        if(props.size() > 0) {
-            log.error(ExternalStrings.FD_FDSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
-            return false;
-        }
-        return true;
-    }
-
-    @Override // GemStoneAddition  
-    public void resetStats() {
-        num_heartbeats=num_suspect_events=0;
-        suspect_history.removeAll();
-    }
-
-
-    @Override // GemStoneAddition  
-    public void init() throws Exception {
-        if(stack != null && stack.timer != null)
-            timer=new TimeScheduler(60000); // GemStoneAddition: run monitor in a separate 
-                                            // timer since it can take a while to run
-        else
-            throw new Exception(getName()+".init(): timer cannot be retrieved from protocol stack");
-    }
-
-
-    /**
-     * Just ensure that this class gets loaded.
-     * 
-     * @see SystemFailure#loadEmergencyClasses()
-     */
-    public static void loadEmergencyClasses() { // GemStoneAddition
-      // no further action required
-    }
-    
-    /**
-     * Kill the Monitor and the HeartbeadSender
-     * 
-     * @see SystemFailure#emergencyClose()
-     */
-    public void emergencyClose() { // GemStoneAddition
-//      stop();
-
-      Monitor m = monitor;
-      if (m != null) {
-        m.stop();
-      }
-      HeartbeatSender hb = this.hbsender;
-      if (hb != null) {
-        hb.stop();
-      }
-    }
-    
-    @Override // GemStoneAddition  
-    public void stop() {
-        stopMonitor();
-    }
-
-
-    protected/*GemStoneAddition*/ Address getPingDest(List mbrs) {
-        Object current_dest = ping_dest; // GemStoneAddition
-        
-        // GemStoneAddition - copy the list and iterate over the copy
-        synchronized(mbrs) {
-          mbrs = new ArrayList(mbrs);
-        }
-
-        if(/*mbrs == null || */ mbrs.size() < 2 || local_addr == null)
-            return null;
-
-        int myIndex = mbrs.indexOf(local_addr);
-        if (myIndex < 0) {
-          return null;
-        }
-        
-        // GemStoneAddition - broadcaster tracks suspects, which are in
-        // mbrs list and must be skipped here
-        int neighborIndex = myIndex;
-        boolean wrapped = false;
-        Address neighborAddr = null;
-        do {
-          neighborIndex++;
-          if (neighborIndex > (mbrs.size()-1)) {
-            neighborIndex = 0;
-            wrapped = true;
-          }
-          if (wrapped && (neighborIndex == myIndex)) {
-            neighborAddr = null;
-            break;
-          }
-          neighborAddr = (Address)mbrs.get(neighborIndex);
-        } while (bcast_task.isSuspectedMember(neighborAddr));
-
-        // GemStoneAddition - reset timestamp and ack count if we change ping_dest
-        // to avoid immediately suspecting new member when the change happened due
-        // to a member being ejected
-        if (current_dest != null  &&  neighborAddr != null  &&  !current_dest.equals(neighborAddr)) {
-          last_ack = System.currentTimeMillis();
-          num_tries = 0;
-        }
-        return neighborAddr;
-    }
-
-
-    /** GemStoneAddition - active heartbeat destination determination */
-    Address getHeartbeatDest() {
-      List mbrs;
-      
-      synchronized(pingable_mbrs) {
-        mbrs = new ArrayList(pingable_mbrs);
-      }
-
-      int myIndex = mbrs.indexOf(local_addr);
-      if (myIndex == 0) {
-        return null;
-      }
-      
-      // GemStoneAddition - broadcaster tracks suspects, which are in
-      // mbrs list and must be skipped here
-      int neighborIndex = myIndex;
-      boolean wrapped = false;
-      Address neighborAddr = null;
-      do {
-        neighborIndex--;
-        if (neighborIndex < 0) {
-          neighborIndex = mbrs.size()-1;
-          wrapped = true;
-        }
-        if (wrapped && (neighborIndex == myIndex)) {
-          neighborAddr = null;
-          break;
-        }
-        neighborAddr = (Address)mbrs.get(neighborIndex);
-      } while (bcast_task.isSuspectedMember(neighborAddr));
-
-      return neighborAddr;
-    }
-
-
-    private void startMonitor() {
-      if (DISABLED || disconnecting) {
-        return;
-      }
-        synchronized(monitor_mutex) {
-            if(monitor != null && monitor.started == false) {
-                monitor=null;
-            }
-            if(monitor == null) {
-                monitor=new Monitor();
-                last_ack=System.currentTimeMillis();  // start from scratch
-                num_tries=0;  // GemStoneAddition - initialize this before scheduling the monitor
-                timer.add(monitor, true);  // fixed-rate scheduling
-            }
-            // GemStoneAddition - start heartbeat sender task
-            if (this.hbsender != null && this.hbsender.started == false) {
-              this.hbsender = null;
-            }
-            if (this.hbsender == null) {
-              this.hbsender = new HeartbeatSender();
-              // run the hb sender in the stack's timer so it isn't blocked by the Monitor
-              stack.timer.add(this.hbsender, true);
-            }
-        }
-    }
-
-    private void stopMonitor() {
-        synchronized(monitor_mutex) {
-            if(monitor != null) {
-                monitor.stop();
-                monitor=null;
-            }
-            // GemStoneAddition - stop heartbeat sender task
-            if (this.hbsender != null) {
-              this.hbsender.stop();
-              this.hbsender = null;
-            }
-        }
-    }
-
-    private boolean isCoordinator; // GemStoneAddition
-    private boolean disconnecting; // GemStoneAddition
-
-    @Override // GemStoneAddition  
-    public void up(Event evt) {
-        Message msg;
-        FdHeader hdr;
-        Object sender, tmphdr;
-        // GemStoneAddition - avoid race conditions by reading ping_dest and caching it
-        Address pd = ping_dest;
-
-        switch(evt.getType()) {
-
-        case Event.SET_LOCAL_ADDRESS:
-            local_addr=(Address)evt.getArg();
-            break;
-            
-        case Event.MSG:
-          if (DISABLED || disconnecting) {
-            break;
-          }
-            msg=(Message)evt.getArg();
-            // GemStoneAddition - check for mismatched configuration with FD_SOCK
-            
-            tmphdr=msg.getHeader(getName());
-            if(tmphdr == null || !(tmphdr instanceof FdHeader)) {
-                if(pd != null && (sender=msg.getSrc()) != null) {
-                    if(pd.equals(sender)) {
-                        last_ack=System.currentTimeMillis();
-//                        if(trace)
-//                            log.trace("received msg from " + sender + " (counts as heartbeat)");
-                        num_tries=0;
-                    }
-                }
-                break;  // message did not originate from FD layer, just pass up
-            }
-
-            hdr=(FdHeader)msg.removeHeader(getName());
-            switch(hdr.type) {
-            case FdHeader.HEARTBEAT:                       // heartbeat request; send heartbeat ack
-              if (this.beingSick) { // GemStoneAddition - test hook
-                break;
-              }
-                Address hb_sender=msg.getSrc();
-                Message hb_ack=new Message(hb_sender, null, null);
-                hb_ack.isHighPriority = true;
-                FdHeader tmp_hdr=new FdHeader(FdHeader.HEARTBEAT_ACK);
-
-                // 1.  Send an ack
-                tmp_hdr.from=local_addr;
-                hb_ack.putHeader(getName(), tmp_hdr);
-                if(trace)
-                    log.trace(getLocalAddress() + ":" + getName() + " received heartbeat request from " + hb_sender + ", sending heartbeat");
-                passDown(new Event(Event.MSG, hb_ack));
-
-                // 2. Shun the sender of a HEARTBEAT message if that sender is not a member. This will cause
-                //    the sender to leave the group (and possibly rejoin it later)
-                if(shun)
-                    shunInvalidHeartbeatSender(hb_sender);
-                break;                                     // don't pass up !
-
-            case FdHeader.HEARTBEAT_ACK:                   // heartbeat ack
-                if(pd != null && pd.equals(hdr.from)) {
-                    last_ack=System.currentTimeMillis();
-                    num_tries=0;
-                    if(log.isDebugEnabled()) log.debug(getLocalAddress() + ":" + getName() + " received heartbeat from " + hdr.from);
-                }
-                else {
-                    stop();
-                    if (log.isDebugEnabled()) log.debug(getLocalAddress() + ":" + getName() + " received heartbeat from " + hdr.from + " who is not my ping-dest (" + pd + ")");
-                    ping_dest=getPingDest(pingable_mbrs);
-                    pd = ping_dest;
-                    if(pd != null) {
-                        try {
-                            startMonitor();
-                        }
-                        catch(Exception ex) {
-                            if(warn) log.warn(ExternalStrings.FD_EXCEPTION_WHEN_CALLING_STARTMONITOR, ex);
-                        }
-                    }
-                    if (log.isDebugEnabled()) log.debug(getLocalAddress() + ":" + getName() + " ping_dest is now " + pd);
-                }
-                break;
-
-            case FdHeader.SUSPECT:
-                if(hdr.mbrs != null) {
-                    if(trace) log.trace("[SUSPECT] suspect hdr is " + hdr);
-                    // GemStoneAddition - log the notification
-//                    log.getLogWriterI18n().info(
-//                      JGroupsStrings.FD_RECEIVED_SUSPECT_NOTIFICATION_FOR_MEMBERS_0_FROM_1_2,
-//                      new Object[] {hdr.mbrs, msg.getSrc(), ""});
-                    // GemStoneAddition - if the sender isn't in this member's view,
-                    // and this is the coordinator, he may have been ousted from
-                    // the system and should be told so
-                    if (!isInMembership(msg.getSrc())) {
-                      break;
-                    }
-                    for(int i=0; i < hdr.mbrs.size(); i++) {
-                        Address m=(Address)hdr.mbrs.elementAt(i);
-                        if(local_addr != null && m.equals(local_addr)) {
-                            if(warn)
-                                log.warn("I was suspected, but will not remove myself from membership " +
-                                         "(waiting for EXIT message)");
-                        }
-                        else {
-                          // GemStoneAddition - broadcaster tracks suspects, and
-                          // they are not removed from pingable_mbrs
-                          bcast_task.addSuspectedMember(m);
-//                          synchronized(pingable_mbrs) { // GemStoneAddition - synch on this
-//                            pingable_mbrs.remove(m);
-//                          }
-                            ping_dest=getPingDest(pingable_mbrs);
-                            if (log.isDebugEnabled()) log.debug("Old "+getName()+" ping-dest was susepected, so selected new ping-dest " + ping_dest);
-                            pd = ping_dest;
-                            if (pd != null) { // GemStoneAddition - start the monitor
-                              try {
-                                startMonitor();
-                              } catch (Exception ex) {
-                                if (warn) log.warn("exception when calling startMonitor()", ex);
-                              }
-                            }
-                        }
-                        passUp(new Event(Event.SUSPECT, new SuspectMember(msg.getSrc(), m))); // GemStoneAddition SuspectMember struct
-                        passDown(new Event(Event.SUSPECT, new SuspectMember(msg.getSrc(), m)));
-                    }
-                }
-                break;
-
-            case FdHeader.NOT_MEMBER:
-                if(shun) {
-                    log.getLogWriter().severe(ExternalStrings.FD_RECEIVED_NOT_MEMBER_MESSAGE_FROM_0_THIS_VM_IS_NO_LONGER_A_MEMBER_EXITING, msg.getSrc());
-                    passUp(new Event(Event.EXIT, stack.gfBasicFunctions.getForcedDisconnectException(
-                      ExternalStrings.FD_THIS_MEMBER_HAS_BEEN_FORCED_OUT_OF_THE_DISTRIBUTED_SYSTEM_PLEASE_CONSULT_GEMFIRE_LOGS_TO_FIND_THE_REASON_FD.toLocalizedString())));
-                }
-                break;
-            }
-//            return; GemStoneAddition - let VERIFY_SUSPECT see this traffic
-        }
-        passUp(evt); // pass up to the layer above us
-    }
-
-    
-    public void beSick() { // GemStoneAddition
-      this.beingSick = true;
-    }
-    
-    public void beHealthy() { // GemStoneAddition
-      this.beingSick = false;
-    }
-
-    /**
-     * GemStoneAddition - allows notification of msg being received from
-     * a member through GemFire's other communication channels
-     * 
-     * @param sender the address that sent the message
-     */
-    public void messageReceivedFrom(Address sender) {
-      if (DISABLED || disconnecting) {
-        return;
-      }
-      Address pd = ping_dest;
-      if(pd != null  &&  pd.equals(sender)) {
-        last_ack=System.currentTimeMillis();
-//        if(trace)
-//            log.trace("FD received msg from " + sender + " (counts as heartbeat)");
-        num_tries=0;
-      }
-    }
-
-
-    
-    @Override // GemStoneAddition  
-    public void down(Event evt) {
-        View v;
-        // GemStoneAddtition - avoid race conditions by reading ping_dest once and caching it
-        Address pd = ping_dest;
-
-        switch(evt.getType()) {
-        case Event.MSG:
-          if (DISABLED || disconnecting) {
-            passDown(evt);
-            break;
-          }
-          Message msg = (Message)evt.getArg();
-          FD_SOCK.FdHeader hdr = (FD_SOCK.FdHeader)msg.getHeader("FD_SOCK");
-          if (hdr != null && hdr.type == FD_SOCK.FdHeader.SUSPECT
-              && hdr.mbrs.contains(pd)) {
-            // my ping_dest has been suspected by FD_SOCK, so go on to the
-            // next
-            // GemStoneAddition - bcaster tracks all suspected members now
-            for (Iterator it=hdr.mbrs.iterator(); it.hasNext(); ) {
-              bcast_task.addSuspectedMember((Address)it.next());
-            }
-//            synchronized (pingable_mbrs) {
-//              pingable_mbrs.removeAll(hdr.mbrs); 
-//            }
-            passDown(evt);
-            ping_dest=getPingDest(pingable_mbrs);
-            pd = ping_dest;
-            if (log.isDebugEnabled()) log.debug(getLocalAddress() + ": " + getName() + " ping-dest is now " + pd);
-            if (pd != null) {
-              try {
-                startMonitor();
-              } catch (Exception ex) {
-                if (warn) {
-                  log.warn(ExternalStrings.FD_EXCEPTION_WHEN_CALLING_STARTMONITOR, ex);
-                }
-              }
-            }
-          }
-          else {
-            passDown(evt);
-          }
-          break;
-            
-        case Event.VIEW_CHANGE:
-          if (DISABLED || disconnecting) {
-            passDown(evt);
-            break;
-          }
-            synchronized(this) {
-                stop();
-                v=(View)evt.getArg();
-                this.coordinator = v.getCreator(); // GemStoneAddition - send heartbeat to coordinator, too
-                members.clear();
-                members.addAll(v.getMembers());
-                bcast_task.adjustSuspectedMembers(members);
-                synchronized(pingable_mbrs) {
-                  Address coord = new Membership(v.getMembers()).getCoordinator(); // GemStoneAddition
-                  this.isCoordinator = this.local_addr != null
-                    && coord != null
-                    && this.local_addr.equals(coord);
-                  pingable_mbrs.clear();
-                  pingable_mbrs.addAll(members);
-                }
-                passDown(evt);
-                ping_dest=getPingDest(pingable_mbrs);
-                if (log.isDebugEnabled()) log.debug(getLocalAddress()+":"+getName()+" ping-dest is now " + ping_dest + " and coordinator is " + coordinator);
-                pd = ping_dest;
-                if(pd != null) {
-                    try {
-                        startMonitor();
-                    }
-                    catch(Exception ex) {
-                        if(warn) log.warn("exception when calling startMonitor()", ex);
-                    }
-                }
-            }
-            break;
-
-        case Event.UNSUSPECT:
-          if (DISABLED || disconnecting) {
-            passDown(evt);
-            break;
-          }
-          Address mbr = (Address)evt.getArg();
-          if (log.isDebugEnabled()) {
-            StringBuffer sb = new StringBuffer(getName()+" is unsuspecting ").append(mbr);
-            log.getLogWriter().info(ExternalStrings.DEBUG, sb);
-          }
-            unsuspect(mbr);
-            // GemStoneAddition - select ping_dest here instead of in unsuspect()
-            ping_dest=getPingDest(pingable_mbrs);
-            if (log.isDebugEnabled()) {
-              StringBuffer sb = new StringBuffer(getLocalAddress()+":"+getName()+" ping-dest is now ").append(ping_dest);
-              log.getLogWriter().info(ExternalStrings.DEBUG, sb);
-            }
-            pd = ping_dest;
-            if (pd != null) { // GemStoneAddition - start the monitor
-              try {
-                startMonitor();
-              } catch (Exception ex) {
-                if (warn) log.warn("exception when calling startMonitor()", ex);
-              }
-            }
-            passDown(evt);
-            break;
-
-        case Event.DISCONNECTING: // GemStoneAddition - make sure we stop shunning/suspecting at this point
-          this.disconnecting = true;
-          passDown(evt);
-          stop();
-          break;
-          
-        case Event.START: // GemStoneAddition - reset state when restarting
-          this.disconnecting = false;
-          passDown(evt);
-          break;
-
-        default:
-            passDown(evt);
-            break;
-        }
-    }
-
-
-    private void unsuspect(Address mbr) {
-        bcast_task.removeSuspectedMember(mbr);
-//        synchronized(pingable_mbrs) { // GemStoneAddition - synch on this
-//          pingable_mbrs.clear();
-//          pingable_mbrs.addAll(members);
-          // GemStoneAddition - pingable_mbrs contains all members, both suspect and non-suspect
-//          pingable_mbrs.removeAll(bcast_task.getSuspectedMembers());
-//        }
-//        if (log.isDebugEnabled()) log.debug("unsuspected " + mbr + " in FD.  ping-dest is now " + ping_dest);
-    }
-
-
-    
-    /**
-     * GemStoneAddition if this is the coordinator, see if the member is in the
-     * current view.  Otherwise punt and say he is in the view
-     */
-    private boolean isInMembership(Address sender) {
-      if (this.isCoordinator) {
-        if (pingable_mbrs != null) {
-          synchronized(pingable_mbrs) {
-            Set m = new HashSet(pingable_mbrs);
-            return m.contains(sender);
-          }
-        }
-      }
-      return true;
-    }
-    
-    public void SUSPECT_ALL() {
-      log.getLogWriter().severe(ExternalStrings.ONE_ARG, getName()+".SUSPECT_ALL invoked", new Exception("stack trace"));
-      synchronized(pingable_mbrs) {
-        for (Iterator it=pingable_mbrs.iterator(); it.hasNext(); ) {
-          Address mbr = (Address)it.next();
-          if (!mbr.equals(this.local_addr)) {
-            Message msg = new Message();
-            FD_SOCK.FdHeader hdr = new FD_SOCK.FdHeader(FD_SOCK.FdHeader.FD_SUSPECT, mbr);
-            msg.putHeader("FD_SOCK", hdr);
-            passUp(new Event(Event.MSG, msg));
-          }
-        }
-      }
-    }
-        
-    /**
-     * If sender is not a member, send a NOT_MEMBER to sender (after n pings received)
-     */
-    private void shunInvalidHeartbeatSender(Address hb_sender) {
-        int num_pings=0;
-        Message shun_msg;
-        
-        // GemStoneAddition - access members under sync
-        boolean notMember;
-        synchronized (this) {
-          notMember = hb_sender != null && members != null && !members.contains(hb_sender);
-        }
-
-        if(notMember) {
-            if(invalid_pingers.containsKey(hb_sender)) {
-                num_pings=((Integer)invalid_pingers.get(hb_sender)).intValue();
-                if(num_pings >= max_tries) {
-                    if(log.isDebugEnabled())
-                        log.debug(hb_sender + " is not in " + members + " ! Shunning it");
-                    shun_msg=new Message(hb_sender, null, null);
-                    shun_msg.putHeader(getName(), new FdHeader(FdHeader.NOT_MEMBER));
-                    shun_msg.isHighPriority = true;
-                    passDown(new Event(Event.MSG, shun_msg));
-                    invalid_pingers.remove(hb_sender);
-                }
-                else {
-                    num_pings++;
-                    invalid_pingers.put(hb_sender, Integer.valueOf(num_pings));
-                }
-            }
-            else {
-                num_pings++;
-                invalid_pingers.put(hb_sender, Integer.valueOf(num_pings));
-            }
-        }
-    }
-
-
-    public static class FdHeader extends Header implements Streamable {
-        public static final byte HEARTBEAT=0;
-        public static final byte HEARTBEAT_ACK=1;
-        public static final byte SUSPECT=2;
-        public static final byte NOT_MEMBER=3;  // received as response by pinged mbr when we are not a member
-
-
-        byte    type=HEARTBEAT;
-        Vector  mbrs=null;
-        Address from=null;  // member who detected that suspected_mbr has failed
-
-
-
-        public FdHeader() {
-        } // used for externalization
-
-        public FdHeader(byte type) {
-            this.type=type;
-        }
-
-        public FdHeader(byte type, Vector mbrs, Address from) {
-            this(type);
-            this.mbrs=mbrs;
-            this.from=from;
-        }
-
-
-        @Override // GemStoneAddition  
-        public String toString() {
-            switch(type) {
-                case HEARTBEAT:
-                    return "[FD: heartbeat request]";
-                case HEARTBEAT_ACK:
-                    return "[FD: heartbeat]";
-                case SUSPECT:
-                    return "[FD: SUSPECT (suspected_mbrs=" + mbrs + ", from=" + from + ")]";
-                case NOT_MEMBER:
-                    return "[FD: NOT_MEMBER]";
-                default:
-                    return "[FD: unknown type (" + type + ")]";
-            }
-        }
-
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeByte(type);
-            if(mbrs == null)
-                out.writeBoolean(false);
-            else {
-                out.writeBoolean(true);
-                out.writeInt(mbrs.size());
-                for(Iterator it=mbrs.iterator(); it.hasNext();) {
-                    Address addr=(Address)it.next();
-                    Marshaller.write(addr, out);
-                }
-            }
-            Marshaller.write(from, out);
-        }
-
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            type=in.readByte();
-            boolean mbrs_not_null=in.readBoolean();
-            if(mbrs_not_null) {
-                int len=in.readInt();
-                mbrs=new Vector(11);
-                for(int i=0; i < len; i++) {
-                    Address addr=(Address)Marshaller.read(in);
-                    mbrs.add(addr);
-                }
-            }
-            from=(Address)Marshaller.read(in);
-        }
-
-
-        @Override // GemStoneAddition  
-        public long size(short version) {
-            int retval=Global.BYTE_SIZE; // type
-            retval+=Util.size(mbrs, version);
-            retval+=Util.size(from, version);
-            return retval;
-        }
-
-
-        public void writeTo(DataOutputStream out) throws IOException {
-            out.writeByte(type);
-            Util.writeAddresses(mbrs, out);
-            Util.writeAddress(from, out);
-        }
-
-
-
-        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
-            type=in.readByte();
-            mbrs=(Vector)Util.readAddresses(in, Vector.class);
-            from=Util.readAddress(in);
-        }
-
-    }
-
-
-    /**
-     * GemStoneAddition - for idle processes, send a heartbeat ack every once
-     * in a while to keep the other process from having to send a request for
-     * one.
-     */
-    protected class HeartbeatSender implements TimeScheduler.Task {
-
-      volatile /* GemStoneAddition */ boolean started = true;
-      long interval = FD.this.timeout * 2 / 3;
-
-      public void stop() {
-        started = false;
-      }
-      
-      public boolean cancelled() {
-        return !started;
-      }
-      public long nextInterval() {
-        return (interval <= 0? 25 : interval);
-      }
-      
-      @Override // GemStoneAddition  
-      public String toString() {
-          return getName()+" heartbeat sender: " + started;
-      }
-
-      public void run() {
-        if (!beingSick) {
-          Address receiver = FD.this.getHeartbeatDest();
-          Address coord = coordinator;
-          if (log.isDebugEnabled()) {
-            log.debug("sending heartbeat to " + receiver + " and coordinator " + coord);
-          }
-          if (receiver != null) {
-            Message msg = new Message();
-            msg.setDest(receiver);
-            msg.putHeader(getName(), new FdHeader(FdHeader.HEARTBEAT_ACK,
-                null, FD.this.local_addr));
-            msg.isHighPriority = true;
-            if (!started) return; // GemStoneAddition -- last-chance check
-            FD.this.passDown(new Event(Event.MSG, msg));
-          }
-          // GemStoneAddition - also send a heartbeat to the coordinator
-          if (coord != null) {
-            Message msg = new Message();
-            msg.setDest(coord);
-            msg.putHeader(getName(), new FdHeader(FdHeader.HEARTBEAT_ACK,
-                null, FD.this.local_addr));
-            msg.isHighPriority = true;
-            if (!started) return; // GemStoneAddition -- last-chance check
-            FD.this.passDown(new Event(Event.MSG, msg));
-          }
-        }
-      }
-    }
-    
-    protected/*GemStoneAddition*/ class Monitor implements TimeScheduler.Task {
-        volatile /* GemStoneAddition */ boolean started=true;
-
-        public void stop() {
-            started=false;
-        }
-
-
-        public boolean cancelled() {
-            return !started;
-        }
-
-
-        /** this is the number of milliseconds until the task should be run again */
-        public long nextInterval() {
-          return timeout;
-        }
-
-
-        public void run() {
-            Message hb_req;
-            long not_heard_from; // time in msecs we haven't heard from ping_dest
-            // GemStoneAddition - avoid race conditions by reading ping_dest only once
-            Address pd = ping_dest;
-
-            if (beingSick) {
-              return;
-            }
-            
-            if(pd == null) {
-                // GemStoneAddition - changed from warn() to debug() since we now remove
-                // suspected mbrs from pingable_members and recalculate ping_dest in
-                // this method
-                if(log.isDebugEnabled())
-                    log.debug("ping_dest is null: members=" + members + ", pingable_mbrs=" +
-                            pingable_mbrs + ", local_addr=" + local_addr);
-                return;
-            }
-
-
-            // 1. send heartbeat request
-            hb_req=new Message(pd, null, null);
-            hb_req.putHeader(getName(), new FdHeader(FdHeader.HEARTBEAT));  // send heartbeat request
-            hb_req.isHighPriority = true;
-//            if(log.isDebugEnabled())
-//                log.debug("sending heartbeat request to " + pd + " (own address=" + local_addr + ')'); // GemStoneAddition - this said "are-you-alive msg"
-            if (!started) return; // GemStoneAddition
-            passDown(new Event(Event.MSG, hb_req));
-            num_heartbeats++;
-            
-            // 2. If the time of the last heartbeat is > timeout and max_tries heartbeat messages have not been
-            //    received, then broadcast a SUSPECT message. Will be handled by coordinator, which may install
-            //    a new view
-            not_heard_from=System.currentTimeMillis() - last_ack;
-            // quick & dirty fix: increase timeout by 500msecs to allow for latency (bela June 27 2003)
-//            if(log.isDebugEnabled())
-//              log.debug("FD running in " + local_addr + ":"+getName()+" watching " + pd +
-//                      " not_heard_from=" + not_heard_from + " timeout=" + (timeout+500) +
-//                      " num_tries=" + num_tries + " max_tries="+max_tries);
-            if(not_heard_from > timeout + 500) { // no heartbeat ack for more than timeout msecs
-                if(num_tries >= max_tries) {
-                    if(log.isDebugEnabled())
-                        log.debug("[" + local_addr + "]:"+getName()+" received no heartbeat ack from " + pd +
-                                " for " + (num_tries +1) + " times (" + ((num_tries+1) * timeout) +
-                                " milliseconds), suspecting it");
-                    // broadcast a SUSPECT message to all members - loop until
-                    // unsuspect or view change is received
-                    //bcast_task.addSuspectedMember(pd);
-                    FD_SOCK fdsock = (FD_SOCK)stack.findProtocol("FD_SOCK");
-                    if (fdsock != null && !fdsock.checkSuspect(pd, getName()+" heartbeat timeout")) {
-                      // GemStoneAddition - add to suspected mbrs and recalc ping_dest
-                      synchronized(pingable_mbrs) {
-                        //pingable_mbrs.remove(pd);
-                        bcast_task.addSuspectedMember(pd);
-                        ping_dest = getPingDest(pingable_mbrs);
-                        if (log.isDebugEnabled()) log.debug(getLocalAddress()+":"+getName()+" ping-dest is now suspect.  new ping-dest is " + ping_dest);
-                        if (ping_dest == null) {
-                          stop();
-                        }
-                      }
-                    }
-                    num_tries=0;
-                    if(stats) {
-                        num_suspect_events++;
-                        suspect_history.add(pd);
-                    }
-                }
-                else {
-                    if(log.isDebugEnabled())
-                        log.debug("heartbeat missing from " + pd + " (number=" + num_tries + ')');
-                    num_tries++;
-                }
-            }
-        }
-
-
-        @Override // GemStoneAddition  
-        public String toString() {
-            return getName()+" heartbeat monitor: " + started;
-        }
-
-    }
-
-
-    /**
-     * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
-     * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
-     * sure they are retransmitted until a view has been received which doesn't contain the suspected members
-     * any longer. Then the task terminates.
-     */
-    protected/*GemStoneAddition*/ class Broadcaster {
-        private final Vector suspected_mbrs=new Vector(7);
-//        BroadcastTask task=null; GemStoneAddition
-//        private final Object bcast_mutex=new Object(); GemStoneAddition
-
-
-//        Vector getSuspectedMembers() {
-//            return suspected_mbrs;
-//        }
-//
-        /**
-         * Starts a new task, or - if already running - adds the argument to the running task.
-         * @param suspect
-         */
-//        private void startBroadcastTask(Address suspect) {
-//            synchronized(bcast_mutex) {
-//                if(task == null || task.cancelled()) {
-//                    task=new BroadcastTask((Vector)suspected_mbrs.clone());
-//                    task.addSuspectedMember(suspect);
-//                    task.run();      // run immediately the first time
-//                    timer.add(task); // then every timeout milliseconds, until cancelled
-//                    if(trace)
-//                        log.trace("BroadcastTask started");
-//                }
-//                else {
-//                    task.addSuspectedMember(suspect);
-//                }
-//            }
-//        }
-
-//        private void stopBroadcastTask() {
-//            synchronized(bcast_mutex) {
-//                if(task != null) {
-//                    task.stop();
-//                    task=null;
-//                }
-//            }
-//        }
-
-
-        // GemStoneAddition - the broadcaster is disabled in GemFire.  All
-        // SUSPECT messages are sent by FD_SOCK after performing socket-connect
-        // verification
-        
-        /** Adds a suspected member. Starts the task if not yet running */
-        void addSuspectedMember(Address mbr) {
-            if(mbr == null) return;
-            synchronized(this) {  // GemStone - since members may be cleared, we need a sync
-              if(!members.contains(mbr)) return;
-            }
-            synchronized(suspected_mbrs) {
-                if(!suspected_mbrs.contains(mbr)) {
-                    suspected_mbrs.addElement(mbr);
-//                    startBroadcastTask(mbr);
-                }
-            }
-        }
-
-        void removeSuspectedMember(Address suspected_mbr) {
-            if(suspected_mbr == null) return;
-            if(log.isDebugEnabled()) log.debug("removing suspect member " + suspected_mbr);
-            synchronized(suspected_mbrs) {
-                suspected_mbrs.removeElement(suspected_mbr);
-//                if(suspected_mbrs.size() == 0)
-//                    stopBroadcastTask();
-            }
-        }
-        
-        /**
-         * GemStoneAddition - test to see if member is currently suspected
-         * @param mbr the address of the member in question
-         * @return true if the member is under suspicion
-         */
-        boolean isSuspectedMember(Address mbr) {
-          synchronized(suspected_mbrs) {
-            return suspected_mbrs.contains(mbr);
-          }
-        }
-
-        void removeAll() {
-            synchronized(suspected_mbrs) {
-                suspected_mbrs.removeAllElements();
-//                stopBroadcastTask();
-            }
-        }
-
-        /** Removes all elements from suspected_mbrs that are <em>not</em> in the new membership */
-        void adjustSuspectedMembers(List new_mbrship) {
-            if(new_mbrship == null || new_mbrship.size() == 0) return;
-            StringBuffer sb=new StringBuffer();
-            synchronized(suspected_mbrs) {
-                if (log.isDebugEnabled()) sb.append("suspected_mbrs: ").append(suspected_mbrs);
-                suspected_mbrs.retainAll(new_mbrship);
-//                if(suspected_mbrs.size() == 0)
-//                    stopBroadcastTask();
-                if (log.isDebugEnabled()) sb.append(", after adjustment: ").append(suspected_mbrs);
-                log.debug(sb.toString());
-            }
-        }
-    }
-
-/*
-    private class BroadcastTask implements TimeScheduler.Task {
-        boolean cancelled=false;
-        private final Vector suspected_members=new Vector();
-
-
-        BroadcastTask(Vector suspected_members) {
-            this.suspected_members.addAll(suspected_members);
-        }
-
-        public void stop() {
-            cancelled=true;
-            suspected_members.clear();
-            if(trace)
-                log.trace("BroadcastTask stopped");
-        }
-
-        public boolean cancelled() {
-            return cancelled;
-        }
-
-        public long nextInterval() {
-            return FD.this.timeout;
-        }
-
-        public void run() {
-            Message suspect_msg;
-            FD.FdHeader hdr;
-
-            synchronized(suspected_members) {
-                if(suspected_members.size() == 0) {
-                    stop();
-                    if(log.isDebugEnabled()) log.debug("task done (no suspected members)");
-                    return;
-                }
-
-                hdr=new FdHeader(FdHeader.SUSPECT);
-                hdr.mbrs=(Vector)suspected_members.clone();
-                hdr.from=local_addr;
-            }
-            suspect_msg=new Message();       // mcast SUSPECT to all members
-            suspect_msg.putHeader(name, hdr);
-            suspect_msg.isHighPriority = true;
-            if(log.isDebugEnabled())
-                log.debug("broadcasting SUSPECT message [suspected_mbrs=" + suspected_members + "] to group");
-            passDown(new Event(Event.MSG, suspect_msg));
-            if(log.isDebugEnabled()) log.debug("task done");
-        }
-
-        public void addSuspectedMember(Address suspect) {
-            if(suspect != null && !suspected_members.contains(suspect)) {
-                suspected_members.add(suspect);
-            }
-        }
-    }
-*/
-}


Mime
View raw message