geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jche...@apache.org
Subject incubator-geode git commit: GEODE-77(Sub-Task GEODE-205) After a network outage is fixed a former member can rejoin
Date Tue, 22 Sep 2015 18:26:00 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-77 377a495b8 -> 99e50c12a


GEODE-77(Sub-Task GEODE-205) After a network outage is fixed a former member can rejoin


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/99e50c12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/99e50c12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/99e50c12

Branch: refs/heads/feature/GEODE-77
Commit: 99e50c12ae710264f61b6db4f149f8cae07db521
Parents: 377a495
Author: Jianxia Chen <jchen@pivotal.io>
Authored: Tue Sep 22 11:16:46 2015 -0700
Committer: Jianxia Chen <jchen@pivotal.io>
Committed: Tue Sep 22 11:16:46 2015 -0700

----------------------------------------------------------------------
 .../internal/InternalDistributedSystem.java     |  15 +-
 .../internal/membership/gms/Services.java       |  15 +
 .../membership/gms/interfaces/JoinLeave.java    |   8 +-
 .../membership/gms/interfaces/Manager.java      |  14 +-
 .../membership/gms/interfaces/Messenger.java    |   7 +-
 .../membership/gms/membership/GMSJoinLeave.java |  22 +-
 .../membership/gms/messenger/GMSPingPonger.java |  41 +++
 .../gms/messenger/GMSQuorumChecker.java         | 243 +++++++++++++
 .../gms/messenger/JGroupsMessenger.java         |  53 ++-
 .../gms/mgr/GMSMembershipManager.java           | 162 ++++-----
 .../messenger/GMSQuorumCheckerJUnitTest.java    | 345 +++++++++++++++++++
 .../messenger/JGroupsMessengerJUnitTest.java    | 154 ++++++++-
 12 files changed, 938 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
index 3ed9607..a14a332 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
@@ -924,7 +924,8 @@ public final class InternalDistributedSystem
     if (isForcedDisconnect) {
       this.forcedDisconnect = true;
       resetReconnectAttemptCounter();
-      reconnected = tryReconnect(true, reason, GemFireCacheImpl.getInstance());
+    
+     reconnected = tryReconnect(true, reason, GemFireCacheImpl.getInstance());
     }
     if (!reconnected) {
       disconnect(false, reason, shunned);
@@ -2475,20 +2476,17 @@ public final class InternalDistributedSystem
    */
   public boolean tryReconnect(boolean forcedDisconnect, String reason, GemFireCacheImpl oldCache) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    
     synchronized (CacheFactory.class) { // bug #51335 - deadlock with app thread trying to create a cache
       synchronized (GemFireCacheImpl.class) {
         // bug 39329: must lock reconnectLock *after* the cache
         synchronized (reconnectLock) {
-          if (!forcedDisconnect &&
-              !oldCache.isClosed() &&
-              oldCache.getCachePerfStats().getReliableRegionsMissing() == 0) {
+          if (!forcedDisconnect && !oldCache.isClosed() && oldCache.getCachePerfStats().getReliableRegionsMissing() == 0) {
             if (isDebugEnabled) {
               logger.debug("tryReconnect: No required roles are missing.");
             }
             return false;
           }
-        
+
           if (isDebugEnabled) {
             logger.debug("tryReconnect: forcedDisconnect={} sqlf listener={}", forcedDisconnect, this.sqlfDisconnectListener);
           }
@@ -2496,7 +2494,7 @@ public final class InternalDistributedSystem
             // allow the fabric-service to stop before dismantling everything
             notifySqlfForcedDisconnectListener();
 
-            if (this.config.getDisableAutoReconnect()) {
+            if (config.getDisableAutoReconnect()) {
               if (isDebugEnabled) {
                 logger.debug("tryReconnect: auto reconnect after forced disconnect is disabled");
               }
@@ -2504,7 +2502,7 @@ public final class InternalDistributedSystem
             }
           }
           reconnect(forcedDisconnect, reason);
-          return this.reconnectDS != null && this.reconnectDS.isConnected();
+          return (this.reconnectDS != null && this.reconnectDS.isConnected());
         } // synchronized reconnectLock
       } // synchronized cache
     } // synchronized CacheFactory.class
@@ -2577,7 +2575,6 @@ public final class InternalDistributedSystem
       logger.debug("changing thread name to ReconnectThread");
     }
     Thread.currentThread().setName("ReconnectThread");
-    Thread.currentThread().setDaemon(false);
     
     // get the membership manager for quorum checks
     MembershipManager mbrMgr = this.dm.getMembershipManager();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
index 9382d9c..c364b4d 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
@@ -5,6 +5,7 @@ import java.util.Timer;
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.ForcedDisconnectException;
 import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
 import com.gemstone.gemfire.distributed.internal.DMStats;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
@@ -49,6 +50,7 @@ public class Services {
   final private Stopper cancelCriterion;
   private volatile boolean stopping;
   private volatile boolean stopped;
+  private volatile Exception shutdownCause;
 
   private InternalLogWriter logWriter;
   private InternalLogWriter securityLogWriter;
@@ -293,8 +295,21 @@ public class Services {
     return this.cancelCriterion;
   }
   
+  public void setShutdownCause(Exception e) {
+    this.shutdownCause = e;
+  }
   
+  public Exception getShutdownCause() {
+    return shutdownCause;
+  }
   
+  public boolean isShutdownDueToForcedDisconnect() {
+    return shutdownCause instanceof ForcedDisconnectException;
+  }
+  
+  public boolean isAutoReconnectEnabled() {
+    return !getConfig().getDistributionConfig().getDisableAutoReconnect();
+  }
   
   public static class Stopper extends CancelCriterion {
     volatile String reasonForStopping = null;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java
index 7bf35ec..f8584e5 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java
@@ -37,7 +37,13 @@ public interface JoinLeave extends Service {
    * returns the current membership view
    */
   NetView getView();
-
+  
+  
+  /**
+   * returns the last known view prior to close - for reconnecting
+   */
+  NetView getPreviousView();
+  
   /**
    * test hook
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
index 4539909..bd9274e 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Manager.java
@@ -82,14 +82,6 @@ public interface Manager extends Service, MessageHandler {
   boolean isMulticastAllowed();
   
   /**
-   * This establishes the reason that a shutdown is being performed.
-   * After this the cancelCriterion will start reporting that a
-   * cancel is in progress.
-   * @param e the reason for the shutdown
-   */
-  void setShutdownCause(Exception e);
-  
-  /**
    * Returns the reason for a shutdown. 
    */
   Throwable getShutdownCause();
@@ -110,4 +102,10 @@ public interface Manager extends Service, MessageHandler {
    * suspicion
    */
   void memberSuspected(SuspectMember suspect);
+  
+  /**
+   * Indicate whether we are attempting a reconnect
+   */
+  boolean isReconnectingDS();
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
index d01dcd8..b154403 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
@@ -1,10 +1,10 @@
 package com.gemstone.gemfire.distributed.internal.membership.gms.interfaces;
 
-import java.io.IOException;
 import java.util.Set;
 
 import com.gemstone.gemfire.distributed.internal.DistributionMessage;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
 
 public interface Messenger extends Service {
   /**
@@ -22,4 +22,9 @@ public interface Messenger extends Service {
    * returns the endpoint ID for this member
    */
   InternalDistributedMember getMemberID();
+  
+  /**
+   * 
+   */
+  QuorumChecker getQuorumChecker();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index f88e1c0..8c55298 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -104,6 +104,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   /** the currently installed view */
   private volatile NetView currentView;
   
+  /** the previous view **/
+  private volatile NetView previousView;
+  
   private final Set<InternalDistributedMember> removedMembers = new HashSet<>();
   
   /** a new view being installed */
@@ -734,6 +737,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   public NetView getView() {
     return currentView;
   }
+  
+  public NetView getPreviousView() {
+    return previousView;
+  }
 
   @Override
   public InternalDistributedMember getMemberID() {
@@ -771,7 +778,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           return;
         }
       }
-      
+      previousView = currentView;
       currentView = newView;
       preparedView = null;
       lastConflictingView = null;
@@ -1265,7 +1272,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       Set<InternalDistributedMember> leaveReqs = new HashSet<>();
       Set<InternalDistributedMember> removalReqs = new HashSet<>();
       List<String> removalReasons = new ArrayList<String>();
-      
+
       NetView oldView = currentView;
       List<InternalDistributedMember> oldMembers;
       if (oldView != null) {
@@ -1282,16 +1289,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         if (msg instanceof JoinRequestMessage) {
           mbr = ((JoinRequestMessage)msg).getMemberID();
 
-          boolean duplicate = false;
-          for (InternalDistributedMember m: oldMembers) {
-            // check the netMembers, which wildcards the
-            // viewID to detect old IDs still in the view
-            if (mbr.getNetMember().equals(m.getNetMember())) {
-              duplicate = true;
-              break;
-            }
-          }
-          if (!duplicate && !joinReqs.contains(mbr)) {
+          if (!joinReqs.contains(mbr)) {
             joinReqs.add(mbr);
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSPingPonger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSPingPonger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSPingPonger.java
new file mode 100644
index 0000000..4384032
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSPingPonger.java
@@ -0,0 +1,41 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
+
+import org.jgroups.Address;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+
+import com.gemstone.gemfire.internal.Version;
+
+public class GMSPingPonger {
+  private byte[] pingInBytes = new byte[] { 'p', 'i', 'n', 'g' };
+  private byte[] pongInBytes = new byte[] { 'p', 'o', 'n', 'g' };
+  
+  public boolean isPingMessage(byte[] buffer) {
+    return buffer.length == 4 && (buffer[0] == 'p' && buffer[1] == 'i' && buffer[2] == 'n' && buffer[3] == 'g');
+  }
+  
+  public boolean isPongMessage(byte[] buffer) {
+    return buffer.length == 4 && (buffer[0] == 'p' && buffer[1] == 'o' && buffer[2] == 'n' && buffer[3] == 'g');
+  }
+  
+  public void sendPongMessage(JChannel channel, Address src, Address dest) throws Exception {
+    channel.send(createJGMessage(pongInBytes, src, dest, Version.CURRENT_ORDINAL)); 
+  }
+  
+  public Message createPongMessage(Address src, Address dest) {
+	  return createJGMessage(pongInBytes, src, dest, Version.CURRENT_ORDINAL);
+  }
+  
+  public void sendPingMessage(JChannel channel, Address src, JGAddress dest) throws Exception {
+    channel.send(createJGMessage(pingInBytes, src, dest, Version.CURRENT_ORDINAL));
+  }
+
+  private Message createJGMessage(byte[] msgBytes, Address src, Address dest, short version) {
+	Message msg = new Message();
+	msg.setDest(dest);
+	msg.setSrc(src);
+	msg.setObject(msgBytes);
+	return msg;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
new file mode 100644
index 0000000..fead37b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumChecker.java
@@ -0,0 +1,243 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.logging.log4j.Logger;
+import org.jgroups.Address;
+import org.jgroups.Event;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.Receiver;
+import org.jgroups.View;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
+import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class GMSQuorumChecker implements QuorumChecker {
+  private static final Logger logger = LogService.getLogger();
+  private boolean isDebugEnabled = false;
+  private Map<SocketAddress, InternalDistributedMember> addressConversionMap;
+  private GMSPingPonger pingPonger;
+
+  private Set<InternalDistributedMember> receivedAcks;
+
+  private NetView lastView;
+
+  // guarded by this
+  private boolean quorumAchieved = false;
+  private JChannel channel;
+  private JGAddress myAddress;
+  private int partitionThreshold;
+
+  public GMSQuorumChecker(NetView jgView, int partitionThreshold, JChannel channel) {
+    this.lastView = jgView;
+    this.partitionThreshold = partitionThreshold;
+    this.channel = channel;
+  }
+
+  public void initialize() {
+    receivedAcks = new ConcurrentHashSet<InternalDistributedMember>();
+
+    pingPonger = new GMSPingPonger();
+//    UUID logicalAddress = (UUID) channel.getAddress();
+//    IpAddress ipaddr = (IpAddress) channel.down(new Event(Event.GET_PHYSICAL_ADDRESS));
+//    
+//    myAddress = new JGAddress(logicalAddress, ipaddr);
+    myAddress = (JGAddress)channel.down(new Event(Event.GET_LOCAL_ADDRESS));
+
+    addressConversionMap = new ConcurrentHashMap<SocketAddress, InternalDistributedMember>(this.lastView.size());
+    List<InternalDistributedMember> members = this.lastView.getMembers();
+    for (InternalDistributedMember addr : members) {
+      SocketAddress sockaddr = new InetSocketAddress(addr.getNetMember().getInetAddress(), addr.getPort());
+      addressConversionMap.put(sockaddr, addr);
+    }
+
+    isDebugEnabled = logger.isDebugEnabled();
+    resume();
+  }
+
+  @Override
+  public synchronized boolean checkForQuorum(long timeout) throws InterruptedException {
+    if (quorumAchieved) {
+      return true;
+    }
+
+    if (isDebugEnabled) {
+      logger.debug("beginning quorum check with {}", this);
+    }
+    try {
+      sendPingMessages();
+      quorumAchieved = waitForResponses(lastView.getMembers().size(), timeout);
+      // If we did not achieve full quorum, calculate if we achieved quorum
+      if (!quorumAchieved) {
+        quorumAchieved = calculateQuorum();
+      }
+    } finally {
+
+    }
+    return quorumAchieved;
+  }
+
+  @Override
+  public void suspend() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void resume() {
+    channel.setReceiver(null);
+    channel.setReceiver(new QuorumCheckerReceiver());
+  }
+
+  @Override
+  public Object getMembershipInfo() {
+    return channel;
+  }
+
+  private boolean calculateQuorum() {
+    // quorum check
+    int weight = getWeight(this.lastView.getMembers(), this.lastView.getLeadMember());
+    int ackedWeight = getWeight(receivedAcks, this.lastView.getLeadMember());
+    int lossThreshold = (int) Math.round((weight * this.partitionThreshold) / 100.0);
+    if (isDebugEnabled) {
+      logger.debug("quorum check: contacted {} processes with {} member weight units.  Threshold for a quorum is {}", receivedAcks.size(), ackedWeight, lossThreshold);
+    }
+    return (ackedWeight >= lossThreshold);
+  }
+
+  private boolean waitForResponses(int numMembers, long timeout) throws InterruptedException {
+    long endTime = System.currentTimeMillis() + timeout;
+    for (;;) {
+      long time = System.currentTimeMillis();
+      long remaining = (endTime - time);
+      if (remaining <= 0) {
+        if (isDebugEnabled) {
+          logger.debug("quorum check: timeout waiting for responses.  {} responses received", receivedAcks.size());
+        }
+        break;
+      }
+      if (isDebugEnabled) {
+        logger.debug("quorum check: waiting up to {}ms to receive a quorum of responses", remaining);
+      }
+      Thread.sleep(500);
+      if (receivedAcks.size() == numMembers) {
+        // we've heard from everyone now so we've got a quorum
+        if (isDebugEnabled) {
+          logger.debug("quorum check: received responses from all members that were in the old distributed system");
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private int getWeight(Collection<InternalDistributedMember> idms, InternalDistributedMember leader) {
+    int weight = 0;
+    for (InternalDistributedMember mbr : idms) {
+      int thisWeight = mbr.getNetMember().getMemberWeight();
+      if (mbr.getVmKind() == 10 /* NORMAL_DM_KIND */) {
+        thisWeight += 10;
+        if (leader != null && mbr.equals(leader)) {
+          thisWeight += 5;
+        }
+      } else if (mbr.getNetMember().preferredForCoordinator()) {
+        thisWeight += 3;
+      }
+      weight += thisWeight;
+    }
+    return weight;
+  }
+
+  private void sendPingMessages() {
+    // send a ping message to each member in the last view seen
+    List<InternalDistributedMember> members = this.lastView.getMembers();
+    for (InternalDistributedMember addr : members) {
+      if (!receivedAcks.contains(addr)) {
+        JGAddress dest = new JGAddress(addr);
+        if (isDebugEnabled) {
+          logger.debug("quorum check: sending request to {}", addr);
+        }
+        try {
+          pingPonger.sendPingMessage(channel, myAddress, dest);
+        } catch (Exception e) {
+          logger.info("Failed sending Ping message to " + dest);
+        }
+      }
+    }
+  }
+
+  private class QuorumCheckerReceiver implements Receiver {
+
+    @Override
+    public void receive(Message msg) {
+      Object contents = msg.getBuffer();
+      if (contents instanceof byte[]) {
+        byte[] msgBytes = (byte[]) contents;
+        if (pingPonger.isPingMessage(msgBytes)) {
+          try {
+            pingPonger.sendPongMessage(channel, myAddress, msg.getSrc());
+          } catch (Exception e) {
+            logger.info("Failed sending Pong message to " + msg.getSrc());
+          }
+        } else if (pingPonger.isPongMessage(msgBytes)) {
+          pongReceived(msg.getSrc());
+        }
+      }
+    }
+
+    @Override
+    public void getState(OutputStream output) throws Exception {
+      // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void setState(InputStream input) throws Exception {
+      // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void viewAccepted(View new_view) {
+      // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void suspect(Address suspected_mbr) {
+      // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void block() {
+      // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void unblock() {
+      // TODO Auto-generated method stub
+    }
+
+    public void pongReceived(Address sender) {
+      logger.info("received ping-pong response from {}", sender);
+      JGAddress jgSender = (JGAddress) sender;
+      SocketAddress sockaddr = new InetSocketAddress(jgSender.getInetAddress(), jgSender.getPort());
+      InternalDistributedMember memberAddr = addressConversionMap.get(sockaddr);
+
+      if (memberAddr != null) {
+        logger.info("quorum check: mapped address to member ID {}", memberAddr);
+        receivedAcks.add(memberAddr);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 1bf0213..c7c6a19 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -1,8 +1,7 @@
 package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
 
 import static com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil.replaceStrings;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
@@ -26,8 +25,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
-
 import org.apache.logging.log4j.Logger;
 import org.jgroups.Address;
 import org.jgroups.Event;
@@ -57,6 +54,7 @@ import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
 import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
 import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
@@ -75,6 +73,8 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
 public class JGroupsMessenger implements Messenger {
 
   private static final Logger logger = Services.getLogger();
@@ -115,6 +115,9 @@ public class JGroupsMessenger implements Messenger {
 
   private View jgView;
   
+  private GMSPingPonger pingPonger = new GMSPingPonger();
+
+  
   static {
     // register classes that we've added to jgroups that are put on the wire
     ClassConfigurator.add(JGroupsMessenger.JGROUPS_TYPE_JGADDRESS, JGAddress.class);
@@ -261,7 +264,7 @@ public class JGroupsMessenger implements Messenger {
     }
 
     try {
-      
+      myChannel.setReceiver(null);
       myChannel.setReceiver(new JGroupsReceiver());
       if (!reconnecting) {
         myChannel.connect("AG"); // apache g***** (whatever we end up calling it)
@@ -283,7 +286,12 @@ public class JGroupsMessenger implements Messenger {
   @Override
   public void stop() {
     if (this.myChannel != null) {
-      this.myChannel.close();
+      if ((services.isShutdownDueToForcedDisconnect() && services.isAutoReconnectEnabled()) || services.getManager().isReconnectingDS()) {
+        
+      }
+      else {
+        this.myChannel.close();
+      }
     }
   }
 
@@ -312,6 +320,7 @@ public class JGroupsMessenger implements Messenger {
   
   private void establishLocalAddress() {
     UUID logicalAddress = (UUID)myChannel.getAddress();
+    logicalAddress = logicalAddress.copy();
     
     IpAddress ipaddr = (IpAddress)myChannel.down(new Event(Event.GET_PHYSICAL_ADDRESS));
     
@@ -450,7 +459,7 @@ public class JGroupsMessenger implements Messenger {
       }
       if (problem != null) {
         if (services.getManager().getShutdownCause() != null) {
-          Throwable cause = services.getManager().getShutdownCause();
+          Throwable cause = services.getShutdownCause();
           // If ForcedDisconnectException occurred then report it as actual
           // problem.
           if (cause instanceof ForcedDisconnectException) {
@@ -460,7 +469,7 @@ public class JGroupsMessenger implements Messenger {
             while (ne.getCause() != null) {
               ne = ne.getCause();
             }
-            ne.initCause(services.getManager().getShutdownCause());
+            ne.initCause(services.getShutdownCause());
           }
         }
         final String channelClosed = LocalizedStrings.GroupMembershipService_CHANNEL_CLOSED.toLocalizedString();
@@ -757,10 +766,21 @@ public class JGroupsMessenger implements Messenger {
   public void emergencyClose() {
     this.view = null;
     if (this.myChannel != null) {
-      this.myChannel.disconnect();
+      if ((services.isShutdownDueToForcedDisconnect() && services.isAutoReconnectEnabled()) || services.getManager().isReconnectingDS()) {
+      }
+      else {
+        this.myChannel.disconnect();
+      }
     }
   }
   
+  public QuorumChecker getQuorumChecker() {    
+    GMSQuorumChecker qc = new GMSQuorumChecker(
+          services.getJoinLeave().getPreviousView(), services.getConfig().getLossThreshold(),
+          this.myChannel);
+    qc.initialize();
+    return qc;
+  }
   /**
    * Puller receives incoming JGroups messages and passes them to a handler
    */
@@ -776,6 +796,21 @@ public class JGroupsMessenger implements Messenger {
         logger.trace("JGroupsMessenger received {} headers: {}", jgmsg, jgmsg.getHeaders());
       }
       
+      //Respond to ping messages sent from other systems that are in a auto reconnect state
+      Object contents = jgmsg.getBuffer();
+      if (contents instanceof byte[]) {
+          byte[] msgBytes = (byte[]) contents;
+  	    if (pingPonger.isPingMessage(msgBytes)) {
+  	    	try {
+  	    	  pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc());
+            }
+            catch (Exception e) {
+              logger.info("Failed sending Pong message to " + jgmsg.getSrc());
+            }
+  	        return;
+  	    }
+      }
+      
       Object o = readJGMessage(jgmsg);
       if (o == null) {
         logger.warn(LocalizedMessage.create(

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index d58bef1..b873154 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@ -9,9 +9,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.mgr;
 
 import java.io.IOException;
 import java.io.NotSerializableException;
-import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
-import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -31,6 +29,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.logging.log4j.Logger;
+import org.jgroups.JChannel;
 
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.ForcedDisconnectException;
@@ -68,7 +67,7 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.SuspectMember;
 import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
 import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.JGroupsQuorumChecker;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.GMSQuorumChecker;
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.Version;
@@ -108,12 +107,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
   private volatile QuorumChecker quorumChecker;
   
   /**
-   * during an auto-reconnect attempt set this to the old DistributedSystem's
-   * UDP port socket.  The failure detection protocol will pick it up and use it.
-   */
-  private volatile DatagramSocket oldDSUDPSocket;
-  
-  /**
    * thread-local used to force use of JGroups for communications, usually to
    * avoid deadlock when conserve-sockets=true.  Use of this should be removed
    * when connection pools are implemented in the direct-channel 
@@ -757,9 +750,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
         LocalizedStrings.GroupMembershipService_MEMBERSHIP_FINISHED_VIEW_PROCESSING_VIEWID___0, Long.valueOf(newViewId)));
   }
 
-  /** an exception that caused the manager to shut down */
-  volatile Exception shutdownCause;
-
   /**
    * the timer used to perform periodic tasks
    * @guarded.By latestViewLock
@@ -785,7 +775,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
    * @throws SystemConnectException - problem joining
    */
   private void join() {
-    this.shutdownCause = null;
+    services.setShutdownCause(null);
     
     latestViewLock.writeLock().lock();
     try {
@@ -843,7 +833,6 @@ public class GMSMembershipManager implements MembershipManager, Manager
 
     this.membershipCheckTimeout = config.getSecurityPeerMembershipTimeout();
     this.wasReconnectingSystem = transport.getIsReconnectingDS();
-    this.oldDSUDPSocket = (DatagramSocket)transport.getOldDSMembershipInfo();
     
     // cache these settings for use in send()
     this.mcastEnabled = transport.isMcastEnabled();
@@ -1766,8 +1755,8 @@ public class GMSMembershipManager implements MembershipManager, Manager
   public void uncleanShutdown(String reason, final Exception e) {
     inhibitForcedDisconnectLogging(false);
     
-    if (this.shutdownCause == null) {
-      this.shutdownCause = e;
+    if (services.getShutdownCause() == null) {
+      services.setShutdownCause(e);
     }
     
     if (this.directChannel != null) {
@@ -1852,8 +1841,8 @@ public class GMSMembershipManager implements MembershipManager, Manager
     }
     catch (RuntimeException e) {
       Throwable problem = e;
-      if (this.shutdownCause != null) {
-        Throwable cause = this.shutdownCause;
+      if (services.getShutdownCause() != null) {
+        Throwable cause = services.getShutdownCause();
         // If ForcedDisconnectException occurred then report it as actual
         // problem.
         if (cause instanceof ForcedDisconnectException) {
@@ -1864,7 +1853,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
             ne = ne.getCause();
           }
           try {
-            ne.initCause(this.shutdownCause);
+            ne.initCause(services.getShutdownCause());
           }
           catch (IllegalArgumentException selfCausation) {
             // fix for bug 38895 - the cause is already in place
@@ -1948,8 +1937,8 @@ public class GMSMembershipManager implements MembershipManager, Manager
         theStats.incSentBytes(sentBytes);
     }
     catch (DistributedSystemDisconnectedException ex) {
-      if (this.shutdownCause != null) {
-        throw new DistributedSystemDisconnectedException("DistributedSystem is shutting down", this.shutdownCause);
+      if (services.getShutdownCause() != null) {
+        throw new DistributedSystemDisconnectedException("DistributedSystem is shutting down", services.getShutdownCause());
       } else {
         throw ex; // see bug 41416
       }
@@ -2029,50 +2018,28 @@ public class GMSMembershipManager implements MembershipManager, Manager
     }
   }
   
-  /**
-   * During jgroups connect the UDP protocol will invoke
-   * this method to find the DatagramSocket it should use instead of
-   * creating a new one.
-   */
-  public DatagramSocket getMembershipSocketForUDP() {
-    return this.oldDSUDPSocket;
-  }
-  
   @Override
   public QuorumChecker getQuorumChecker() {
-    if ( ! (this.shutdownCause instanceof ForcedDisconnectException) ) {
+    if ( ! (services.isShutdownDueToForcedDisconnect()) ) {
       return null;
     }
     if (this.quorumChecker != null) {
       return this.quorumChecker;
     }
-    try {
-      // TODO: creation of the quorum checker should be delegated to the
-      // Messenger component.  For JGroups we we really need JChannel instead
-      // of a datagram socket because jgroup
-      // doesn't have the "ping" handling that I built into the TP protocol.s
-      DatagramSocket sock = new DatagramSocket(this.address.getPort(),
-                               this.address.getNetMember().getInetAddress());
-      JGroupsQuorumChecker impl = new JGroupsQuorumChecker(
-          services.getJoinLeave().getView(), services.getConfig().getLossThreshold(),
-          sock);
-      impl.initialize();
-      this.quorumChecker = impl;
-      return impl;
-    } catch (SocketException e) {
-      logger.warn("unable to create quorum checker", e);
-      return null;
-    }
+
+    QuorumChecker impl = services.getMessenger().getQuorumChecker();
+    this.quorumChecker = impl;
+    return impl;
   }
   
   @Override
   public void releaseQuorumChecker(QuorumChecker checker) {
-    ((JGroupsQuorumChecker)checker).teardown();
+    ((GMSQuorumChecker)checker).suspend();
     InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance();
     if (system == null || !system.isConnected()) {
-      DatagramSocket sock = (DatagramSocket)checker.getMembershipInfo();
-      if (sock != null  &&  !sock.isClosed()) {
-        sock.close();
+      JChannel channel = (JChannel)checker.getMembershipInfo();
+      if (channel != null  &&  !channel.isClosed()) {
+        channel.close();
       }
     }
   }
@@ -2169,7 +2136,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
   public Stub getStubForMember(InternalDistributedMember m)
   {
     if (shutdownInProgress) {
-      throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), this.shutdownCause);
+      throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), services.getShutdownCause());
     }
 
     if (services.getConfig().getDistributionConfig().getDisableTcp()) {
@@ -2205,7 +2172,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
     latestViewLock.readLock().lock();
     try {
       if (shutdownInProgress) {
-        throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), this.shutdownCause);
+        throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), services.getShutdownCause());
       }
       InternalDistributedMember result = (InternalDistributedMember)
           stubToMemberMap.get(s);
@@ -2704,11 +2671,7 @@ public class GMSMembershipManager implements MembershipManager, Manager
   
   /* returns the cause of shutdown, if known */
   public Throwable getShutdownCause() {
-    return this.shutdownCause;
-  }
-  
-  public void setShutdownCause(Exception t) {
-    this.shutdownCause = t;
+    return services.getShutdownCause();
   }
 
 //  @Override
@@ -2900,48 +2863,47 @@ public class GMSMembershipManager implements MembershipManager, Manager
   }
 
   @Override
-  public void forceDisconnect(String reason) {
-    if (GMSMembershipManager.this.shutdownInProgress) {
-      return;  // probably a race condition
-    }
-    saveCacheXmlForReconnect();
-    // make sure that we've received a connected channel and aren't responsible
-    // for the notification
-    if (!isJoining()) {
-
-      AlertAppender.getInstance().shuttingDown();
-
-//      Exception logException = e;
-//      if (e instanceof ForcedDisconnectException) {
-//        reason = "Membership closed: " + e;
-//        logException = null;
-//      }
-//      else {
-//        reason = "Membership closed";
-//      }
-
-      services.getCancelCriterion().cancel(reason);
-      // cache the exception so it can be appended to ShutdownExceptions
-      shutdownCause = new ForcedDisconnectException(reason);
-
-      if (!inhibitForceDisconnectLogging) {
-        logger.fatal(LocalizedMessage.create(
-            LocalizedStrings.GroupMembershipService_MEMBERSHIP_SERVICE_FAILURE_0, reason), shutdownCause);
-      }
-      
-      services.emergencyClose();
-
-      // stop server locators immediately since they may not have correct
-      // information.  This has caused client failures in bridge/wan
-      // network-down testing
-      InternalLocator loc = (InternalLocator)Locator.getLocator();
-      if (loc != null) {
-        loc.stop(!services.getConfig().getDistributionConfig()
-                     .getDisableAutoReconnect(), false);
+  public void forceDisconnect(final String reason) {
+	if (GMSMembershipManager.this.shutdownInProgress) {
+		return; // probably a race condition
+	}
+	saveCacheXmlForReconnect();
+    Thread reconnectThread = new Thread (new Runnable() {
+      public void run() {
+        // make sure that we've received a connected channel and aren't responsible
+        // for the notification
+        if (!isJoining()) {
+
+          AlertAppender.getInstance().shuttingDown();
+
+          services.getCancelCriterion().cancel(reason);
+          // cache the exception so it can be appended to ShutdownExceptions
+          Exception shutdownCause = new ForcedDisconnectException(reason);
+          services.setShutdownCause(shutdownCause);
+
+          if (!inhibitForceDisconnectLogging) {
+            logger.fatal(LocalizedMessage.create(
+                LocalizedStrings.GroupMembershipService_MEMBERSHIP_SERVICE_FAILURE_0, reason), shutdownCause);
+          }
+          
+          services.emergencyClose();
+
+          // stop server locators immediately since they may not have correct
+          // information.  This has caused client failures in bridge/wan
+          // network-down testing
+          InternalLocator loc = (InternalLocator)Locator.getLocator();
+          if (loc != null) {
+            loc.stop(!services.getConfig().getDistributionConfig()
+                         .getDisableAutoReconnect(), false);
+          }
+          
+          uncleanShutdown(reason, shutdownCause);
+        }
       }
-      
-      uncleanShutdown(reason, shutdownCause);
-    }
+    });
+    reconnectThread.setName("DisconnectThread");
+    reconnectThread.setDaemon(false);
+    reconnectThread.start();
   }
 
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumCheckerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumCheckerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumCheckerJUnitTest.java
new file mode 100644
index 0000000..098e37b
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSQuorumCheckerJUnitTest.java
@@ -0,0 +1,345 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.jgroups.Event;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.Receiver;
+import org.jgroups.stack.IpAddress;
+import org.jgroups.util.UUID;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class GMSQuorumCheckerJUnitTest {
+
+  private InternalDistributedMember[] mockMembers;
+
+  private Services services;
+
+  private JChannel channel;
+  
+  private JGAddress address;
+
+  private class PingMessageAnswer implements Answer {
+    private int pingCount = 0;
+    private JChannel channel;
+    private GMSPingPonger pingPonger = new GMSPingPonger();
+    private Set<Integer> simulatedPongRespondersByPort;
+    
+    public PingMessageAnswer(JChannel channel, Set<Integer> simulatedPongRespondersByPort) {
+      this.channel = channel;
+      this.simulatedPongRespondersByPort = simulatedPongRespondersByPort;
+    }
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      Object[] args = invocation.getArguments();
+      for (int i = 0; i < args.length; i++) {
+        if (args[i] instanceof Message) {
+          Message msg = (Message) args[i];
+          Object content = null;
+          content = msg.getBuffer();
+          if (content instanceof byte[]) {
+            if (pingPonger.isPingMessage((byte[]) content)) {
+              pingCount++;              
+              if (simulatedPongRespondersByPort.contains(((JGAddress)msg.getDest()).getPort())) {
+                channel.getReceiver().receive(pingPonger.createPongMessage(msg.getDest(), msg.getSrc()));
+              }
+            }
+          }
+        }
+      }
+      return null;
+    }
+    
+    public int getPingCount() {
+      return pingCount;
+    }
+
+  }
+
+  @Before
+  public void initMocks() throws UnknownHostException, Exception {
+    mockMembers = new InternalDistributedMember[12];
+    for (int i = 0; i < mockMembers.length; i++) {
+      mockMembers[i] = new InternalDistributedMember("localhost", 8888 + i);
+    }
+    channel = mock(JChannel.class);
+    address = mock(JGAddress.class);
+    when(channel.getAddress()).thenReturn(new UUID());
+    when(channel.down(any(Event.class))).thenReturn(mock(IpAddress.class));
+    Mockito.doCallRealMethod().when(channel).setReceiver(any(Receiver.class));
+    when(channel.getReceiver()).thenCallRealMethod();
+    Mockito.doReturn(address).when(channel).down(any(Event.class));
+  }
+  
+  private NetView prepareView() throws IOException {
+    return prepareView(mockMembers.length);
+  }
+
+  private NetView prepareView(int numMembers) throws IOException {
+    int viewId = 1;
+    List<InternalDistributedMember> mbrs = new LinkedList<InternalDistributedMember>();
+    Set<InternalDistributedMember> shutdowns = new HashSet<InternalDistributedMember>();
+    Set<InternalDistributedMember> crashes = new HashSet<InternalDistributedMember>();
+    for (int i = 0; i < numMembers; i++) {
+      mbrs.add(mockMembers[i]);
+    }
+
+    // prepare the view
+    NetView netView = new NetView(mockMembers[0], viewId, mbrs, shutdowns, crashes);
+    return netView;
+  }
+
+  @Test
+  public void testQuorumCheckerAllRespond() throws Exception {
+    NetView view = prepareView();
+    Set<Integer> pongResponders = new HashSet<Integer>();
+    for (int i = 0; i < mockMembers.length; i++) {
+      pongResponders.add(mockMembers[i].getPort());
+    }
+    PingMessageAnswer answerer = new PingMessageAnswer(channel, pongResponders);
+    Mockito.doAnswer(answerer).when(channel).send(any(Message.class));
+
+    GMSQuorumChecker qc = new GMSQuorumChecker(view, 51, channel);
+    qc.initialize();
+    boolean quorum = qc.checkForQuorum(500);
+    Assert.assertTrue(quorum);
+    Assert.assertSame(view.getMembers().size(), answerer.getPingCount());
+    Assert.assertTrue(qc.checkForQuorum(500));
+    Assert.assertSame(qc.getMembershipInfo(), channel);
+  }
+  
+  @Test
+  public void testQuorumCheckerMajorityRespond() throws Exception {
+    NetView view = prepareView();
+    Set<Integer> pongResponders = new HashSet<Integer>();
+    for (int i = 0; i < mockMembers.length - 1; i++) {
+      pongResponders.add(mockMembers[i].getPort());
+    }
+    PingMessageAnswer answerer = new PingMessageAnswer(channel, pongResponders);
+    Mockito.doAnswer(answerer).when(channel).send(any(Message.class));
+
+    GMSQuorumChecker qc = new GMSQuorumChecker(view, 51, channel);
+    qc.initialize();
+    boolean quorum = qc.checkForQuorum(500);
+    Assert.assertTrue(quorum);
+    Assert.assertSame(view.getMembers().size(), answerer.getPingCount());
+  }
+  
+  @Test
+  public void testQuorumCheckerNotEnoughWeightForQuorum() throws Exception {
+    NetView view = prepareView();
+    Set<Integer> pongResponders = new HashSet<Integer>();
+    pongResponders.add(mockMembers[0].getPort());
+    PingMessageAnswer answerer = new PingMessageAnswer(channel, pongResponders);
+    Mockito.doAnswer(answerer).when(channel).send(any(Message.class));
+
+    GMSQuorumChecker qc = new GMSQuorumChecker(view, 51, channel);
+    qc.initialize();
+    boolean quorum = qc.checkForQuorum(500);
+    Assert.assertFalse(quorum);
+    Assert.assertSame(view.getMembers().size(), answerer.getPingCount());
+  }
+  
+  @Test
+  public void testQuorumCheckerNoQuorumNoResponders() throws Exception {
+    NetView view = prepareView();
+    Set<Integer> pongResponders = new HashSet<Integer>();
+    PingMessageAnswer answerer = new PingMessageAnswer(channel, pongResponders);    
+    Mockito.doAnswer(answerer).when(channel).send(any(Message.class));
+
+    GMSQuorumChecker qc = new GMSQuorumChecker(view, 51, channel);
+    qc.initialize();
+    boolean quorum = qc.checkForQuorum(500);
+    Assert.assertFalse(quorum);
+    Assert.assertSame(view.getMembers().size(), answerer.getPingCount());
+  }
+  
+  @Test
+  public void testQuorumChecker10Servers2Locators4ServersLost() throws Exception {
+    NetView view = prepareView();
+    mockMembers[0].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    mockMembers[1].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    
+    Set<Integer> pongResponders = new HashSet<Integer>();
+    for (int i = 0; i < mockMembers.length; i++) {
+      pongResponders.add(mockMembers[i].getPort());
+    }
+    //remove 4 servers
+    pongResponders.remove(mockMembers[8].getPort());
+    pongResponders.remove(mockMembers[9].getPort());
+    pongResponders.remove(mockMembers[10].getPort());
+    pongResponders.remove(mockMembers[11].getPort());
+    
+    PingMessageAnswer answerer = new PingMessageAnswer(channel, pongResponders);
+    Mockito.doAnswer(answerer).when(channel).send(any(Message.class));
+
+    GMSQuorumChecker qc = new GMSQuorumChecker(view, 51, channel);
+    qc.initialize();
+    boolean quorum = qc.checkForQuorum(500);
+    Assert.assertTrue(quorum);
+    Assert.assertSame(view.getMembers().size(), answerer.getPingCount());
+  }
+  
+  @Test
+  public void testQuorumChecker10Servers2Locators4ServersAnd1LocatorLost() throws Exception {
+    NetView view = prepareView();
+    mockMembers[0].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    mockMembers[1].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    
+    Set<Integer> pongResponders = new HashSet<Integer>();
+    for (int i = 0; i < mockMembers.length; i++) {
+      pongResponders.add(mockMembers[i].getPort());
+    }
+    //remove 4 servers
+    pongResponders.remove(mockMembers[8].getPort());
+    pongResponders.remove(mockMembers[9].getPort());
+    pongResponders.remove(mockMembers[10].getPort());
+    pongResponders.remove(mockMembers[11].getPort());
+    //remove 1 locator
+    pongResponders.remove(mockMembers[1].getPort());
+
+    
+    PingMessageAnswer answerer = new PingMessageAnswer(channel, pongResponders);
+    Mockito.doAnswer(answerer).when(channel).send(any(Message.class));
+
+    GMSQuorumChecker qc = new GMSQuorumChecker(view, 51, channel);
+    qc.initialize();
+    boolean quorum = qc.checkForQuorum(500);
+    Assert.assertTrue(quorum);
+    Assert.assertSame(view.getMembers().size(), answerer.getPingCount());
+  }
+  
+  @Test
+  public void testQuorumChecker10Servers2Locators5ServersAnd2LocatorsButNotLeadMemberLost() throws Exception {
+    NetView view = prepareView();
+    mockMembers[0].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    mockMembers[1].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    
+    Set<Integer> pongResponders = new HashSet<Integer>();
+    for (int i = 0; i < mockMembers.length; i++) {
+      pongResponders.add(mockMembers[i].getPort());
+    }
+    //remove 5 servers
+    pongResponders.remove(mockMembers[7].getPort());
+    pongResponders.remove(mockMembers[8].getPort());
+    pongResponders.remove(mockMembers[9].getPort());
+    pongResponders.remove(mockMembers[10].getPort());
+    pongResponders.remove(mockMembers[11].getPort());
+    //remove locators
+    pongResponders.remove(mockMembers[0].getPort());
+    pongResponders.remove(mockMembers[1].getPort());
+
+    PingMessageAnswer answerer = new PingMessageAnswer(channel, pongResponders);
+    Mockito.doAnswer(answerer).when(channel).send(any(Message.class));
+
+    GMSQuorumChecker qc = new GMSQuorumChecker(view, 51, channel);
+    qc.initialize();
+    boolean quorum = qc.checkForQuorum(500);
+    Assert.assertFalse(quorum);
+    Assert.assertSame(view.getMembers().size(), answerer.getPingCount());
+  }
+  
+  @Test
+  public void testQuorumChecker10Servers2Locators5ServerAnd1LocatorWithLeadMemberLost() throws Exception {
+    NetView view = prepareView();
+    mockMembers[0].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    mockMembers[1].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    
+    Set<Integer> pongResponders = new HashSet<Integer>();
+    for (int i = 0; i < mockMembers.length; i++) {
+      pongResponders.add(mockMembers[i].getPort());
+    }
+    //remove 5 servers
+    pongResponders.remove(mockMembers[2].getPort()); //lead member
+    pongResponders.remove(mockMembers[8].getPort());
+    pongResponders.remove(mockMembers[9].getPort());
+    pongResponders.remove(mockMembers[10].getPort());
+    pongResponders.remove(mockMembers[11].getPort());
+    
+    //remove locator
+    pongResponders.remove(mockMembers[0].getPort());
+
+    PingMessageAnswer answerer = new PingMessageAnswer(channel, pongResponders);
+    Mockito.doAnswer(answerer).when(channel).send(any(Message.class));
+
+    GMSQuorumChecker qc = new GMSQuorumChecker(view, 51, channel);
+    qc.initialize();
+    boolean quorum = qc.checkForQuorum(500);
+    Assert.assertFalse(quorum);
+    Assert.assertSame(view.getMembers().size(), answerer.getPingCount());
+  }
+  
+  @Test
+  public void testQuorumChecker2Servers2LocatorsLeadMemberLost() throws Exception {
+    int numMembers = 4;
+    NetView view = prepareView(numMembers);
+    mockMembers[0].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    mockMembers[1].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    
+    Set<Integer> pongResponders = new HashSet<Integer>();
+    for (int i = 0; i < numMembers; i++) {
+      pongResponders.add(mockMembers[i].getPort());
+    }
+    //remove lead member
+    pongResponders.remove(mockMembers[2].getPort()); //lead member
+
+    PingMessageAnswer answerer = new PingMessageAnswer(channel, pongResponders);
+    Mockito.doAnswer(answerer).when(channel).send(any(Message.class));
+
+    GMSQuorumChecker qc = new GMSQuorumChecker(view, 51, channel);
+    qc.initialize();
+    boolean quorum = qc.checkForQuorum(500);
+    Assert.assertTrue(quorum);
+    Assert.assertSame(view.getMembers().size(), answerer.getPingCount());
+  }
+  
+  @Test
+  public void testQuorumChecker2Servers2LocatorsLeadMemberAnd1LocatorLost() throws Exception {
+    int numMembers = 4;
+    NetView view = prepareView(numMembers);
+    mockMembers[0].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    mockMembers[1].setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+    
+    Set<Integer> pongResponders = new HashSet<Integer>();
+    for (int i = 0; i < numMembers; i++) {
+      pongResponders.add(mockMembers[i].getPort());
+    }
+    //remove members
+    pongResponders.remove(mockMembers[2].getPort()); //lead member
+    pongResponders.remove(mockMembers[0].getPort()); //locator
+
+    PingMessageAnswer answerer = new PingMessageAnswer(channel, pongResponders);
+    Mockito.doAnswer(answerer).when(channel).send(any(Message.class));
+
+    GMSQuorumChecker qc = new GMSQuorumChecker(view, 51, channel);
+    qc.initialize();
+    boolean quorum = qc.checkForQuorum(500);
+    Assert.assertFalse(quorum);
+    Assert.assertSame(view.getMembers().size(), answerer.getPingCount());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/99e50c12/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 3b1c1dc..83c1419 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -1,6 +1,7 @@
 package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -10,14 +11,17 @@ import static org.mockito.Mockito.when;
 import java.util.List;
 import java.util.Properties;
 
+import junit.framework.Assert;
+
 import org.jgroups.Event;
 import org.jgroups.Message;
 import org.jgroups.util.UUID;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 
+import com.gemstone.gemfire.ForcedDisconnectException;
 import com.gemstone.gemfire.distributed.internal.DMStats;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
@@ -211,6 +215,154 @@ public class JGroupsMessengerJUnitTest {
         sentMessages == 2);
   }
   
+  @Test
+  public void testChannelStillConnectedAfterEmergencyCloseAfterForcedDisconnectWithAutoReconnect() throws Exception {
+    initMocks(false);
+    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    Mockito.doCallRealMethod().when(services).getShutdownCause();
+    Mockito.doCallRealMethod().when(services).emergencyClose();
+    Mockito.doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
+    Mockito.doCallRealMethod().when(services).isAutoReconnectEnabled();
+    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
+    assertTrue(messenger.myChannel.isConnected());
+    messenger.emergencyClose();
+    assertTrue(messenger.myChannel.isConnected());
+  }
+  
+  @Test
+  public void testChannelStillConnectedAfterStopAfterForcedDisconnectWithAutoReconnect() throws Exception {
+    initMocks(false);
+    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    Mockito.doCallRealMethod().when(services).getShutdownCause();
+    Mockito.doCallRealMethod().when(services).emergencyClose();
+    Mockito.doCallRealMethod().when(services).isShutdownDueToForcedDisconnect();
+    Mockito.doCallRealMethod().when(services).isAutoReconnectEnabled();
+    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
+    assertTrue(messenger.myChannel.isConnected());
+    messenger.stop();
+    assertTrue(messenger.myChannel.isConnected());
+  }
+  
+  @Test
+  public void testChannelStillConnectedAfteremergencyWhileReconnectingDS() throws Exception {
+    initMocks(false);
+    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    Mockito.doCallRealMethod().when(services).getShutdownCause();
+    Mockito.doCallRealMethod().when(services).emergencyClose();
+    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
+    Mockito.doReturn(true).when(manager).isReconnectingDS();
+    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
+    assertTrue(messenger.myChannel.isConnected());
+    messenger.emergencyClose();
+    assertTrue(messenger.myChannel.isConnected());
+  }
+  
+  
+  @Test
+  public void testChannelStillConnectedAfterStopWhileReconnectingDS() throws Exception {
+    initMocks(false);
+    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    Mockito.doCallRealMethod().when(services).getShutdownCause();
+    Mockito.doCallRealMethod().when(services).emergencyClose();
+    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
+    Mockito.doReturn(true).when(manager).isReconnectingDS();
+    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
+    assertTrue(messenger.myChannel.isConnected());
+    messenger.stop();
+    assertTrue(messenger.myChannel.isConnected());
+  }
+  
+  @Test
+  public void testChannelClosedOnEmergencyClose() throws Exception {
+    initMocks(false);
+    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    Mockito.doCallRealMethod().when(services).getShutdownCause();
+    Mockito.doCallRealMethod().when(services).emergencyClose();
+    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
+    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
+    assertTrue(messenger.myChannel.isConnected());
+    messenger.emergencyClose();
+    assertFalse(messenger.myChannel.isConnected());
+  }
+  
+  @Test
+  public void testChannelClosedOnStop() throws Exception {
+    initMocks(false);
+    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    Mockito.doCallRealMethod().when(services).getShutdownCause();
+    Mockito.doCallRealMethod().when(services).emergencyClose();
+    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
+    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
+    assertTrue(messenger.myChannel.isConnected());
+    messenger.stop();
+    assertFalse(messenger.myChannel.isConnected());
+  }
+  
+  @Test
+  public void testChannelClosedAfterEmergencyCloseForcedDisconnectWithoutAutoReconnect() throws Exception {
+    initMocks(false);
+    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    Mockito.doCallRealMethod().when(services).getShutdownCause();
+    Mockito.doCallRealMethod().when(services).emergencyClose();
+    Mockito.doReturn(true).when(services).isShutdownDueToForcedDisconnect();
+    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
+    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
+    assertTrue(messenger.myChannel.isConnected());
+    messenger.emergencyClose();
+    assertFalse(messenger.myChannel.isConnected());
+  }
+  
+  @Test
+  public void testChannelStillConnectedStopAfterForcedDisconnectWithoutAutoReconnect() throws Exception {
+    initMocks(false);
+    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    Mockito.doCallRealMethod().when(services).getShutdownCause();
+    Mockito.doCallRealMethod().when(services).emergencyClose();
+    Mockito.doReturn(true).when(services).isShutdownDueToForcedDisconnect();
+    Mockito.doReturn(false).when(services).isAutoReconnectEnabled();
+    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
+    assertTrue(messenger.myChannel.isConnected());
+    messenger.stop();
+    assertFalse(messenger.myChannel.isConnected());
+  }
+  
+  @Test
+  public void testChannelClosedAfterEmergencyCloseNotForcedDisconnectWithAutoReconnect() throws Exception {
+    initMocks(false);
+    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    Mockito.doCallRealMethod().when(services).getShutdownCause();
+    Mockito.doCallRealMethod().when(services).emergencyClose();
+    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    Mockito.doReturn(true).when(services).isAutoReconnectEnabled();
+    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
+    assertTrue(messenger.myChannel.isConnected());
+    messenger.emergencyClose();
+    assertFalse(messenger.myChannel.isConnected());
+  }
+  
+  @Test
+  public void testChannelStillConnectedStopNotForcedDisconnectWithAutoReconnect() throws Exception {
+    initMocks(false);
+    Mockito.doCallRealMethod().when(services).setShutdownCause(any(ForcedDisconnectException.class));
+    Mockito.doCallRealMethod().when(services).getShutdownCause();
+    Mockito.doCallRealMethod().when(services).emergencyClose();
+    Mockito.doReturn(false).when(services).isShutdownDueToForcedDisconnect();
+    Mockito.doReturn(true).when(services).isAutoReconnectEnabled();
+    Mockito.doReturn(false).when(manager).isReconnectingDS();
+    services.setShutdownCause(new ForcedDisconnectException("Test Forced Disconnect"));
+    assertTrue(messenger.myChannel.isConnected());
+    messenger.stop();
+    assertFalse(messenger.myChannel.isConnected());
+  }
   
   /**
    * creates an InternalDistributedMember address that can be used


Mime
View raw message