geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [2/2] incubator-geode git commit: GEODE-77: bug fixes
Date Thu, 05 Nov 2015 19:07:49 GMT
GEODE-77: bug fixes

GMSHealthMonitorJUnitTest was incorrectly using Mockito's any() when it should
have used isA().  Fixing this exposed a lot of problems in the health monitor
that this checkin addresses.  I've also renamed a number of entities so that we
now have more uniform use of the term "heartbeat" instead of "check".

GMSHealthMonitor now has a positive heartbeat sender thread that determines
who might be watching it and sends unsolicited heartbeats to those members.

GMSHealthMonitor now sends the viewID of its membership ID in TCP/IP
health checks.  This enables the receiver to differentiate between the
received UUID/viewID and its own information when it is a reconnected
member (using auto-reconnect).  The response threads are now also moved
to a cached thread-pool to decrease the cost of these checks.  Responses
now have soLinger set on them (experimental) because I was seeing a lot
of checks fail with EOF even though the member wrote an OK status.

The health monitor now uses suspectMembersInView to avoid suspecting the
same member over and over again.  This means that it can't be used to
avoid duplicate final-checks.  I've also disabled the collection thread for
suspect events because it was adding unnecessary delay in initiating
final-checks on crashed members and I have yet to see it collect more than
one event.

SuspectMembersMessage processing now checks to see if the receiver is the
target of the suspicion and, if so, send a heartbeat to the sender.  This
seems to happen a lot when the membership coordinator is a locator because
the locator doesn't push operations out to other members very often.  The
positive heartbeat sender will also help with this.

This change-set also turns off the JGroups thread pools because they were found
to be causing our performance problem.  This exposed a bug in JGroups that they
are fixing, but for now there is a workaround in StatRecorder.  Along with the
removal of thread pools we now need to pass messages through
handleOrDeferMessage() in GMSMembershipManager since processMessage() can be
blocked during initialization, causing a new process to time out trying to join
the distributed system.

GMSJoinLeave was not setting the failure detection ports on a new view if it
abandoned a view that it could not prepare.

The Connection class had some incorrect checks for shutdown conditions when
the shared/ordered connection to another member is shut down.  This should
improve our detection time for crashed members.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f3034be6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f3034be6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f3034be6

Branch: refs/heads/feature/GEODE-77
Commit: f3034be681dca37aca59710df3d44794d22d0f4e
Parents: c152e20
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
Authored: Thu Nov 5 10:55:37 2015 -0800
Committer: Bruce Schuchardt <bschuchardt@pivotal.io>
Committed: Thu Nov 5 10:55:37 2015 -0800

----------------------------------------------------------------------
 .../internal/DistributionMessage.java           |   3 +-
 .../internal/membership/gms/GMSUtil.java        |   7 +-
 .../internal/membership/gms/Services.java       |   2 +-
 .../membership/gms/fd/GMSHealthMonitor.java     | 806 +++++++++++--------
 .../membership/gms/membership/GMSJoinLeave.java |  73 +-
 .../gms/messages/CheckRequestMessage.java       |  64 --
 .../gms/messages/CheckResponseMessage.java      |  54 --
 .../gms/messages/HeartbeatMessage.java          |  58 ++
 .../gms/messages/HeartbeatRequestMessage.java   |  64 ++
 .../gms/messages/InstallViewMessage.java        |  21 +-
 .../membership/gms/messages/SuspectRequest.java |   2 +-
 .../gms/messenger/JGroupsMessenger.java         |  23 +-
 .../membership/gms/messenger/StatRecorder.java  |  32 +-
 .../gms/mgr/GMSMembershipManager.java           |  21 +-
 .../gemstone/gemfire/internal/DSFIDFactory.java |   8 +-
 .../internal/DataSerializableFixedID.java       |   4 +-
 .../com/gemstone/gemfire/internal/Version.java  |   6 +-
 .../internal/cache/CreateRegionProcessor.java   |   2 +-
 .../internal/cache/tier/sockets/HandShake.java  |   4 +
 .../gemfire/internal/tcp/Connection.java        |  16 +-
 .../gemfire/internal/util/PluckStacks.java      |  47 +-
 .../membership/gms/messenger/jgroups-config.xml |   4 +-
 .../membership/gms/messenger/jgroups-mcast.xml  |   4 +-
 .../gemfire/cache30/ReconnectDUnitTest.java     | 111 +--
 .../membership/GMSHealthMonitorJUnitTest.java   | 463 -----------
 .../gms/membership/GMSJoinLeaveJUnitTest.java   |   6 +-
 .../fd/GMSHealthMonitorJUnitTest.java           | 437 ++++++++++
 .../messenger/JGroupsMessengerJUnitTest.java    |  69 --
 28 files changed, 1222 insertions(+), 1189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
index 4383f1f..4fb97ac 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java
@@ -485,7 +485,8 @@ public abstract class DistributionMessage
    */
   public static boolean isPreciousThread() {
     String thrname = Thread.currentThread().getName();
-    return thrname.startsWith("Geode UDP");
+    //return thrname.startsWith("Geode UDP");
+    return thrname.startsWith("unicast receiver") || thrname.startsWith("multicast receiver");
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
index e1041f2..09c5c71 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSUtil.java
@@ -103,15 +103,16 @@ public class GMSUtil {
    * Formats the bytes in a buffer into hex octets, 50 per
    * line
    */
-  private String formatBytes(byte[] buf) {
+  public static String formatBytes(byte[] buf, int startIndex, int length) {
     StringBuilder w = new StringBuilder(20000);
-    for (int i=0; i<buf.length; i++) {
+    int count = 0;
+    for (int i=startIndex; i<length; i++, count++) {
       String s = Integer.toHexString(buf[i]&0xff);
       if (s.length() == 1) {
         w.append('0');
       }
       w.append(s).append(' ');
-      if ( (i%50) == 49 ) {
+      if ( (count%50) == 49 ) {
         w.append("\n");
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
index 8ab0bbd..d87ec8c 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/Services.java
@@ -56,7 +56,7 @@ public class Services {
   private InternalLogWriter logWriter;
   private InternalLogWriter securityLogWriter;
   
-  private Timer timer = new Timer("GemFire Membership Timer", true);
+  private Timer timer = new Timer("Geode Membership Timer", true);
   
   
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 3f5db38..76a5540 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -1,7 +1,7 @@
 package com.gemstone.gemfire.distributed.internal.membership.gms.fd;
 
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.CHECK_REQUEST;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.CHECK_RESPONSE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.HEARTBEAT_REQUEST;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE;
 import static com.gemstone.gemfire.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;
 
 import java.io.DataInputStream;
@@ -13,6 +13,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,6 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.logging.log4j.Logger;
 import org.jgroups.util.UUID;
 
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.GemFireConfigException;
 import com.gemstone.gemfire.SystemConnectException;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
@@ -45,13 +48,12 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
 import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.CheckResponseMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatRequestMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
 import com.gemstone.gemfire.internal.AvailablePort;
 import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
 
 /**
  * Failure Detection
@@ -86,14 +88,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
   /** membership logger */
   private static Logger logger = Services.getLogger();
+  
+  /**
+   * The number of recipients of periodic heartbeats.  The recipients will
+   * be selected from the members that are likely to be monitoring this member.
+   */
+  private static final int NUM_HEARTBEATS = Integer.getInteger("geode.heartbeat-recipients", 2);
 
   /**
    * Member activity will be recorded per interval/period. Timer task will set interval's starting time.
-   * Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL will be configured
-   * via system property. Default will be 10. Atleast 1 interval is needed.
+   * Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL may be configured
+   * via a system property with a default of 2. At least 1 interval is needed.
    */
-  private static final int LOGICAL_INTERVAL = Integer.getInteger("geode.logical-message-received-interval", 10) > 1 ? Integer.getInteger(
-      "geode.logical-message-received-interval", 10) : 10;
+  public static final int LOGICAL_INTERVAL = Integer.getInteger("geode.logical-message-received-interval", 2);
 
   /** stall time to wait for members leaving concurrently */
   public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = Long.getLong("geode.suspect-member-collection-interval", 200);
@@ -103,25 +110,32 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   /** this member's ID */
   private InternalDistributedMember localAddress;
 
-  final ConcurrentMap<InternalDistributedMember, CustomTimeStamp> memberVsLastMsgTS = new ConcurrentHashMap<>();
-  final private Map<Integer, Response> requestIdVsResponse = new ConcurrentHashMap<>();
-  final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberVsView = new ConcurrentHashMap<>();
-  final private Map<NetView, Set<SuspectRequest>> viewVsSuspectedMembers = new HashMap<>();
+  /**
+   * Timestamp at which we last had contact from a member
+   */
+  final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps = new ConcurrentHashMap<>();
+  
+  /**
+   * Members currently being suspected and the view they were suspected in
+   */
+  final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView = new ConcurrentHashMap<>();
 
   /**
-   * currentSuspects tracks members that we've already checked and
-   * did not receive a response from.  This collection keeps us from
-   * checking the same member over and over if it's already under
-   * suspicion
+   * Replies to messages
+   */
+  final private Map<Integer, Response> requestIdVsResponse = new ConcurrentHashMap<>();
+  
+  /**
+   * Members suspected in a particular view
    */
-  final private Set<InternalDistributedMember> currentSuspects = new ConcurrentHashSet<>();
+  final private Map<NetView, Set<SuspectRequest>> viewVsSuspectedMembers = new HashMap<>();
 
   private ScheduledExecutorService scheduler;
 
   private ExecutorService checkExecutor;
 
-  List<SuspectRequest> suspectRequests = new ArrayList<SuspectRequest>();
-  private RequestCollector<SuspectRequest> suspectRequestCollectorThread;
+//  List<SuspectRequest> suspectRequests = new ArrayList<SuspectRequest>();
+//  private RequestCollector<SuspectRequest> suspectRequestCollectorThread;
 
   /**
    * to stop check scheduler
@@ -129,53 +143,30 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   private ScheduledFuture<?> monitorFuture;
   
   /** test hook */
-  boolean playingDead = false;
+  volatile boolean playingDead = false;
 
   /** test hook */
-  boolean beingSick = false;
+  volatile boolean beingSick = false;
   
   // For TCP check
   private ExecutorService serverSocketExecutor;
-  private static final int OK = 0x01;
-  private static final int ERROR = 0x02;  
+  private static final int OK = 0x7B;
+  private static final int ERROR = 0x00;  
   private InetAddress socketAddress;
   private volatile int socketPort;
   private volatile ServerSocket serverSocket;
 
-  public GMSHealthMonitor() {
-
-  }
-
-  public static void loadEmergencyClasses() {
-  }
-
-  /*
-   * It records the member activity for current time interval.
-   */
-  @Override
-  public void contactedBy(InternalDistributedMember sender) {
-    CustomTimeStamp cTS = new CustomTimeStamp(currentTimeStamp);
-    cTS = memberVsLastMsgTS.putIfAbsent(sender, cTS);
-    if (cTS != null) {
-      cTS.setTimeStamp(currentTimeStamp);
-    }
-    if (currentSuspects.remove(sender)) {
-      logger.info("No longer suspecting {}", sender);
-      setNextNeighbor(currentView, null);
-    }
-  }
-
   /**
    * this class is to avoid garbage
    */
-  private static class CustomTimeStamp {
+  private static class TimeStamp {
     private volatile long timeStamp;
     
-    CustomTimeStamp(long timeStamp) {
+    TimeStamp(long timeStamp) {
       this.timeStamp = timeStamp;
     }
 
-    public long getTimeStamp() {
+    public long getTime() {
       return timeStamp;
     }
 
@@ -213,19 +204,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       GMSHealthMonitor.this.currentTimeStamp = currentTime;
 
       if (neighbour != null) {
-        CustomTimeStamp nextNeighborTS;
+        TimeStamp nextNeighborTS;
         synchronized(GMSHealthMonitor.this) {
-          nextNeighborTS = GMSHealthMonitor.this.memberVsLastMsgTS.get(neighbour);
+          nextNeighborTS = GMSHealthMonitor.this.memberTimeStamps.get(neighbour);
         }
-
+        
         if (nextNeighborTS == null) {
-          CustomTimeStamp customTS = new CustomTimeStamp(currentTime);
-          memberVsLastMsgTS.put(neighbour, customTS);
+          TimeStamp customTS = new TimeStamp(currentTime);
+          memberTimeStamps.put(neighbour, customTS);
           return;
         }
         
         long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
-        long lastTS = currentTime - nextNeighborTS.getTimeStamp();
+        long lastTS = currentTime - nextNeighborTS.getTime();
         if (lastTS + interval >= memberTimeoutInMillis) {
           logger.trace("Checking member {} ", neighbour);
           // now do check request for this member;
@@ -253,9 +244,104 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
   }
 
-  private CheckRequestMessage constructCheckRequestMessage(final InternalDistributedMember mbr) {
+  class ClientSocketHandler implements Runnable {
+
+    private Socket socket;
+
+    public ClientSocketHandler(Socket socket) {
+      this.socket = socket;
+    }
+
+    public void run() {
+      try {
+        DataInputStream in = new DataInputStream(socket.getInputStream());
+        OutputStream out = socket.getOutputStream();
+        short version = in.readShort();
+        int  vmViewId = in.readInt();
+        long uuidLSBs = in.readLong();
+        long uuidMSBs = in.readLong();
+        boolean debug = logger.isDebugEnabled();
+        GMSMember gmbr = (GMSMember) GMSHealthMonitor.this.localAddress.getNetMember();
+        UUID myUUID = gmbr.getUUID();
+        // during reconnect or rapid restart we will have a zero viewId but there may still
+        // be an old ID in the membership view that we do not want to respond to
+        int myVmViewId = gmbr.getVmViewId();
+        if (debug) {
+          if (playingDead) {
+            logger.debug("simulating sick member in health check");
+          } else if (vmViewId == myVmViewId
+            && uuidLSBs == myUUID.getLeastSignificantBits()
+            && uuidMSBs == myUUID.getMostSignificantBits()) {
+            logger.debug("UUID matches my own - sending OK reply");
+          } else {
+            logger.debug("GMSHealthMonitor my UUID is {},{} received is {},{}.  My viewID is {} received is {}",
+              Long.toHexString(myUUID.getMostSignificantBits()),
+              Long.toHexString(myUUID.getLeastSignificantBits()),
+              Long.toHexString(uuidMSBs), Long.toHexString(uuidLSBs),
+              myVmViewId, vmViewId);
+          }
+        }
+        if (!playingDead
+            && uuidLSBs == myUUID.getLeastSignificantBits()
+            && uuidMSBs == myUUID.getMostSignificantBits()
+            && vmViewId == myVmViewId) {
+          socket.setSoLinger(true, (int)memberTimeout);
+          out.write(OK);
+          out.flush();
+          socket.shutdownOutput();
+          if (debug) {
+            logger.debug("GMSHealthMonitor server socket replied OK.");
+          }
+        }
+        else {
+          socket.setSoLinger(true, (int)memberTimeout);
+          out.write(ERROR);
+          out.flush();
+          socket.shutdownOutput();
+          if (debug) {
+            logger.debug("GMSHealthMonitor server socket replied ERROR.");
+          }
+        }
+      } catch (IOException e) {
+        logger.trace("Unexpected exception", e);
+      } finally {
+        if (socket != null) {
+          try {
+            socket.close();
+          } catch (IOException e) {
+            logger.info("Unexpected exception", e);
+          }
+        }
+      }
+    }
+  }
+
+  public GMSHealthMonitor() {
+
+  }
+
+  public static void loadEmergencyClasses() {
+  }
+
+  /*
+   * It records the member activity for current time interval.
+   */
+  @Override
+  public void contactedBy(InternalDistributedMember sender) {
+    TimeStamp cTS = new TimeStamp(currentTimeStamp);
+    cTS = memberTimeStamps.putIfAbsent(sender, cTS);
+    if (cTS != null) {
+      cTS.setTimeStamp(currentTimeStamp);
+    }
+    if (suspectedMemberInView.remove(sender) != null) {
+      logger.info("No longer suspecting {}", sender);
+    }
+    setNextNeighbor(currentView, null);
+  }
+
+  private HeartbeatRequestMessage constructHeartbeatRequestMessage(final InternalDistributedMember mbr) {
     final int reqId = requestId.getAndIncrement();
-    final CheckRequestMessage prm = new CheckRequestMessage(mbr, reqId);
+    final HeartbeatRequestMessage prm = new HeartbeatRequestMessage(mbr, reqId);
     prm.setRecipient(mbr);
 
     return prm;
@@ -272,14 +358,15 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
       @Override
       public void run() {
+        // TODO GemFire used the tcp/ip connection but this is using heartbeats
         boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr);
         if (!pinged) {
-          String reason = "Member isn't responding to health checks";
+          suspectedMemberInView.put(mbr, currentView);
+          String reason = "Member isn't responding to heartbeat requests";
           GMSHealthMonitor.this.sendSuspectMessage(mbr, reason);
-          currentSuspects.add(mbr);
         } else {
           logger.trace("Setting next neighbor as member {} has responded.", mbr);
-          currentSuspects.remove(mbr);
+          suspectedMemberInView.remove(mbr);
           // back to previous one
           setNextNeighbor(GMSHealthMonitor.this.currentView, null);
         }
@@ -289,7 +376,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   private void sendSuspectMessage(InternalDistributedMember mbr, String reason) {
-    logger.info("Sending suspect request {} reason=\"{}\"", mbr, reason);
     SuspectRequest sr = new SuspectRequest(mbr, reason);
     List<SuspectRequest> sl = new ArrayList<SuspectRequest>();
     sl.add(sr);
@@ -297,44 +383,48 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   /**
-   * This method sends check request to other member and waits for {@link #MEMBER_CHECK_TIMEOUT}
+   * This method sends heartbeat request to other member and waits for member-timeout
    * time for response. If it doesn't see response then it returns false.
-   * @param pingMember
+   * @param member
    * @return
    */
-  private boolean doCheckMember(InternalDistributedMember pingMember) {
+  private boolean doCheckMember(InternalDistributedMember member) {
     if (playingDead) {
       // a member playingDead should not be sending messages to other
       // members, so we avoid sending heartbeat requests or suspect
       // messages by returning true.
       return true;
     }
-    logger.trace("Checking member {}", pingMember);
-    final CheckRequestMessage prm = constructCheckRequestMessage(pingMember);
+    logger.trace("Checking member {}", member);
+    final HeartbeatRequestMessage prm = constructHeartbeatRequestMessage(member);
     final Response pingResp = new Response();
     requestIdVsResponse.put(prm.getRequestId(), pingResp);
     try {
       Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(prm);
-      if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(pingMember)) {
+      if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) {
         // member is not part of current view.
-        logger.trace("Member {} is not part of current view.", pingMember);
+        logger.trace("Member {} is not part of current view.", member);
       } else {
         synchronized (pingResp) {
           if (pingResp.getResponseMsg() == null) {
-            pingResp.wait(services.getConfig().getMemberTimeout());
+            pingResp.wait(memberTimeout);
           }
-          CustomTimeStamp ts = memberVsLastMsgTS.get(pingMember);
+          TimeStamp ts = memberTimeStamps.get(member);
           if (pingResp.getResponseMsg() == null) {
             // double check the activity map
-            if (isStopping ||
-                (ts != null &&
-                 ts.getTimeStamp()
-                  > (System.currentTimeMillis() - services.getConfig().getMemberTimeout())
-                  )) {
+            long now = System.currentTimeMillis();
+            if (isStopping) {
               return true;
             }
+            if (ts != null && (now - ts.getTime()) <= memberTimeout) {
+              logger.trace("detected message traffic from member {}ms ago.  member-timeout is {}", now - ts.getTime(),
+                  memberTimeout);
+              return true;
+            }
+            logger.trace("no heartbeat response received from {}", member);
             return false;
           } else {
+            logger.trace("received heartbeat from {}", member);
             if (ts != null) {
               ts.setTimeStamp(System.currentTimeMillis());
             }
@@ -343,7 +433,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         }
       }
     } catch (InterruptedException e) {
-      logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", pingMember);
+      logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", member);
     } finally {
       requestIdVsResponse.remove(prm.getRequestId());
     }
@@ -354,11 +444,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * During final check, establish TCP connection between current member and suspect member.
    * And exchange PING/PONG message to see if the suspect member is still alive.
    * 
-   * @param suspectMember member that does not respond to CheckRequestMessage
+   * @param suspectMember member that does not respond to HeartbeatRequestMessage
    * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
    */
   private boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
-    logger.trace("Checking member {} with TCP socket connection.", suspectMember);
     Socket clientSocket = new Socket();
     try {
       // establish TCP connection
@@ -371,18 +460,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       if (clientSocket.isConnected()) {
         clientSocket.setSoTimeout((int) services.getConfig().getMemberTimeout());
         InputStream in = clientSocket.getInputStream();
-        DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream());   
-        logger.info("TCP check: suspect member uuid: " + ((GMSMember) suspectMember.getNetMember()).getUUID());
+        DataOutputStream out = new DataOutputStream(clientSocket.getOutputStream());
+        GMSMember gmbr = (GMSMember) suspectMember.getNetMember();
         out.writeShort(Version.CURRENT_ORDINAL);
-        out.writeLong(((GMSMember) suspectMember.getNetMember()).getUuidLSBs());
-        out.writeLong(((GMSMember) suspectMember.getNetMember()).getUuidMSBs());
+        out.writeInt(gmbr.getVmViewId());
+        out.writeLong(gmbr.getUuidLSBs());
+        out.writeLong(gmbr.getUuidMSBs());
         out.flush();
         clientSocket.shutdownOutput();
-        logger.debug("Send suspect member uuid to member {} with TCP socket connection.", suspectMember);
+        logger.debug("Connected - reading response", suspectMember);
         int b = in.read();
-        logger.debug("Received {} from member {} with TCP socket connection.", (b == OK ? "OK" : (b == ERROR ? "ERROR" : b)), suspectMember);
+        logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" : b)), suspectMember);
         if (b == OK) {
-          CustomTimeStamp ts = memberVsLastMsgTS.get(suspectMember);
+          TimeStamp ts = memberTimeStamps.get(suspectMember);
           if (ts != null) {
             ts.setTimeStamp(System.currentTimeMillis());
           }
@@ -394,6 +484,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       } else {// cannot establish TCP connection with suspect member
         return false;
       }
+    } catch (SocketTimeoutException e) {
+      return false;
     } catch (IOException e) {
       logger.trace("Unexpected exception", e);
     } finally {
@@ -417,14 +509,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    */
   @Override
   public void suspect(InternalDistributedMember mbr, String reason) {
-    synchronized (suspectRequests) {
-      SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
-      if (!suspectRequests.contains(sr)) {
-        logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
-        suspectRequests.add(sr);
-        suspectRequests.notify();
-      }
-    }
+    sendSuspectMessage(mbr, reason);
+    // Background suspect-collecting thread is currently disabled - it takes too long
+//    synchronized (suspectRequests) {
+//      SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
+//      if (!suspectRequests.contains(sr)) {
+//        logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
+//        suspectRequests.add(sr);
+//        suspectRequests.notify();
+//      }
+//    }
   }
 
   @Override
@@ -437,195 +531,214 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   public void start() {
-    {      
-      scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
-        @Override
-        public Thread newThread(Runnable r) {
-          Thread th = new Thread(Services.getThreadGroup(), r, "GemFire Failure Detection Scheduler");
-          th.setDaemon(true);
-          return th;
-        }
-      });
-    }
-    {
-      checkExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
-        AtomicInteger threadIdx = new AtomicInteger();
-
-        @Override
-        public Thread newThread(Runnable r) {
-          int id = threadIdx.getAndIncrement();
-          Thread th = new Thread(Services.getThreadGroup(), r, "GemFire Failure Detection thread " + id);
-          th.setDaemon(true);
-          return th;
-        }
-      });
-    }
-    {
-      Monitor m = this.new Monitor(memberTimeout);
-      long delay = memberTimeout / LOGICAL_INTERVAL;
-      monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, TimeUnit.MILLISECONDS);
-    }
+    scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure Detection Scheduler");
+        th.setDaemon(true);
+        return th;
+      }
+    });
 
-    {
-      suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("GemFire Suspect Message Collector", Services.getThreadGroup(), suspectRequests,
-          new Callback<SuspectRequest>() {
-            @Override
-            public void process(List<SuspectRequest> requests) {
-              GMSHealthMonitor.this.sendSuspectRequest(requests);
+    checkExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
+      AtomicInteger threadIdx = new AtomicInteger();
 
-            }
-          }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
-      suspectRequestCollectorThread.setDaemon(true);
-      suspectRequestCollectorThread.start();
-    }
+      @Override
+      public Thread newThread(Runnable r) {
+        int id = threadIdx.getAndIncrement();
+        Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure Detection thread " + id);
+        th.setDaemon(true);
+        return th;
+      }
+    });
+    Monitor m = this.new Monitor(memberTimeout);
+    long delay = memberTimeout / LOGICAL_INTERVAL;
+    monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, TimeUnit.MILLISECONDS);
+
+//    suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect Message Collector", Services.getThreadGroup(), suspectRequests,
+//        new Callback<SuspectRequest>() {
+//      @Override
+//      public void process(List<SuspectRequest> requests) {
+//        GMSHealthMonitor.this.sendSuspectRequest(requests);
+//
+//      }
+//    }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
+//    suspectRequestCollectorThread.setDaemon(true);
+//    suspectRequestCollectorThread.start();
     
-    {
-      serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
-        AtomicInteger threadIdx = new AtomicInteger();
-
-        @Override
-        public Thread newThread(Runnable r) {
-          int id = threadIdx.getAndIncrement();
-          Thread th = new Thread(Services.getThreadGroup(), r, "GemFire Failure Detection Server thread " + id);
-          th.setDaemon(true);
-          return th;
-        }
-      });
+    serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
+      AtomicInteger threadIdx = new AtomicInteger();
 
-      serverSocketExecutor.execute(new Runnable() {
-        @Override
-        public void run() {
-          Socket socket = null;
-          try {
-            // start server socket for TCP check
-            if (serverSocket == null) {
-              localAddress = services.getMessenger().getMemberID();            
-              socketAddress = localAddress.getInetAddress();
-              int[] portRange = services.getConfig().getMembershipPortRange();            
-              socketPort = AvailablePort.getAvailablePortInRange(portRange[0], portRange[1], AvailablePort.SOCKET);
-              if (socketPort == -1) {
-                throw new SystemConnectException("Unable to find a free port in the membership port range");
+      @Override
+      public Thread newThread(Runnable r) {
+        int id = threadIdx.getAndIncrement();
+        Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure Detection Server thread " + id);
+        th.setDaemon(true);
+        return th;
+      }
+    });
+
+  }
+
+  /**
+   * start the thread that listens for tcp/ip connections and responds
+   * to connection attempts
+   */
+  private void startTcpServer() {
+    // allocate a socket here so there are no race conditions between knowing the FD
+    // socket port and joining the system
+    socketAddress = localAddress.getInetAddress();
+    int[] portRange = services.getConfig().getMembershipPortRange();            
+    socketPort = AvailablePort.getAvailablePortInRange(portRange[0], portRange[1], AvailablePort.SOCKET);
+    if (socketPort == -1) {
+      throw new SystemConnectException("Unable to find a free port in the membership port range");
+    }
+    try {
+      serverSocket = new ServerSocket();
+      serverSocket.bind(new InetSocketAddress(socketAddress, socketPort));
+    } catch (IOException e) {
+      throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range");
+    }
+
+    serverSocketExecutor.execute(new Runnable() {
+      @Override
+      public void run() {
+        logger.info("Started failure detection server thread on {}:{}.", socketAddress, socketPort);
+        Socket socket = null;
+        try {
+          while (!services.getCancelCriterion().isCancelInProgress() 
+              && !GMSHealthMonitor.this.isStopping) {
+            try {
+              socket = serverSocket.accept();
+              if (GMSHealthMonitor.this.playingDead) {
+                continue;
               }
-              serverSocket = new ServerSocket();
-              serverSocket.bind(new InetSocketAddress(socketAddress, socketPort));
-              logger.info("Started failure detection server thread on {}:{}.", socketAddress, socketPort);
-              while (!services.getCancelCriterion().isCancelInProgress() 
-                  && !GMSHealthMonitor.this.isStopping) {
-                try {
-                  socket = serverSocket.accept();
-                  if (GMSHealthMonitor.this.playingDead) {
-                    continue;
-                  }
-                  socket.setSoTimeout((int) services.getConfig().getMemberTimeout());
-                  new ClientSocketHandler(socket).start();
-                } catch (IOException e) {
-                  if (!isStopping) {
-                    logger.trace("Unexpected exception", e);
-                  }
-                  try {
-                    if (socket != null) {
-                      socket.close();
-                    }
-                  } catch (IOException ioe) {
-                    logger.trace("Unexpected exception", ioe);
-                  }
-                }
+              // [bruce] do we really want a timeout on the server-side?
+//              socket.setSoTimeout((int) services.getConfig().getMemberTimeout());
+              serverSocketExecutor.execute(new ClientSocketHandler(socket)); //start();  [bruce] I'm seeing a lot of failures due to this thread not being created fast enough, sometimes as long as 30 seconds
+            } catch (IOException e) {
+              if (!isStopping) {
+                logger.trace("Unexpected exception", e);
               }
-              logger.info("GMSHealthMonitor server thread exiting");
-            }
-          } catch (IOException e) {
-            logger.trace("Unexpected exception", e);
-          } finally {
-            // close the server socket
-            if (serverSocket != null && !serverSocket.isClosed()) {
               try {
-                serverSocket.close();
-                serverSocket = null;
-                logger.info("GMSHealthMonitor server socket closed.");
-              } catch (IOException e) {
-                logger.debug("Unexpected exception", e);
+                if (socket != null) {
+                  socket.close();
+                }
+              } catch (IOException ioe) {
+                logger.trace("Unexpected exception", ioe);
               }
             }
           }
+          logger.info("GMSHealthMonitor server thread exiting");
+        } finally {
+          // close the server socket
+          if (serverSocket != null && !serverSocket.isClosed()) {
+            try {
+              serverSocket.close();
+              serverSocket = null;
+              logger.info("GMSHealthMonitor server socket closed.");
+            } catch (IOException e) {
+              logger.debug("Unexpected exception", e);
+            }
+          }
         }
-      });
-    }
+      }
+    });
   }
-
-  class ClientSocketHandler extends Thread {
-
-    private Socket socket;
-
-    public ClientSocketHandler(Socket socket) {
-      super(services.getThreadGroup(), "ClientSocketHandler");
-      this.socket = socket;
-      setDaemon(true);
-    }
-
-    public void run() {
-      try {
-        DataInputStream in = new DataInputStream(socket.getInputStream());
-        OutputStream out = socket.getOutputStream();
-        short version = in.readShort();
-        long uuidLSBs = in.readLong();
-        long uuidMSBs = in.readLong();
-        boolean debug = logger.isDebugEnabled();
-        if (debug) {
-          logger.debug("GMSHealthMonitor received health check UUID {},{}",
-              Long.toHexString(uuidMSBs),
-              Long.toHexString(uuidLSBs));
-        }
-        UUID myUUID = ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUUID();
-        if (debug) {
-          if (playingDead) {
-            logger.debug("simulating sick member in health check");
-          } else if (uuidLSBs == myUUID.getLeastSignificantBits()
-            && uuidMSBs == myUUID.getMostSignificantBits()) {
-            logger.debug("UUID matches my own - sending OK reply");
-          } else {
-            logger.debug("GMSHealthMonitor my UUID is                 {},{}",
-              Long.toHexString(myUUID.getMostSignificantBits()),
-              Long.toHexString(myUUID.getLeastSignificantBits()));
+  
+  /**
+   * start the thread that periodically sends a message to processes
+   * that might be watching this process
+   */
+  private void startHeartbeatThread() {
+    checkExecutor.execute(new Runnable() {
+      public void run() {
+        Thread.currentThread().setName("Geode Heartbeat Sender");
+        sendPeriodicHeartbeats();
+      }
+      private void sendPeriodicHeartbeats() {
+        while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+          try {
+            Thread.sleep(memberTimeout/LOGICAL_INTERVAL);
+          } catch (InterruptedException e) {
+            return;
           }
-        }
-        if (!playingDead
-            && uuidLSBs == myUUID.getLeastSignificantBits()
-            && uuidMSBs == myUUID.getMostSignificantBits()) {
-          out.write(OK);
-          out.flush();
-          socket.shutdownOutput();
-          if (debug) {
-            logger.debug("GMSHealthMonitor server socket replied OK.");
+          NetView v = currentView;
+          if (v != null) {
+            List<InternalDistributedMember> mbrs = v.getMembers();
+            int index = mbrs.indexOf(localAddress);
+            if (index < 0 || mbrs.size() < 2) {
+              continue;
+            }
+            if (!playingDead) {
+              sendHeartbeats(mbrs, index);
+            }
           }
         }
-        else {
-          out.write(ERROR);
-          out.flush();
-          socket.shutdownOutput();
-          if (debug) {
-            logger.debug("GMSHealthMonitor server socket replied ERROR.");
+      }
+      
+      private void sendHeartbeats(List<InternalDistributedMember> mbrs, int startIndex) {
+        InternalDistributedMember coordinator = currentView.getCoordinator();
+        if (coordinator != null && !coordinator.equals(localAddress)) {
+          HeartbeatMessage message = new HeartbeatMessage(-1);
+          message.setRecipient(coordinator);
+          try {
+            services.getMessenger().sendUnreliably(message);
+          } catch (CancelException e) {
+            return;
           }
         }
-      } catch (IOException e) {
-        logger.trace("Unexpected exception", e);
-      } finally {
-        if (socket != null) {
+
+        int index = startIndex;
+        int numSent = 0;
+        for (;;) {
+          index--;
+          if (index < 0) {
+            index = mbrs.size()-1;
+          }
+          InternalDistributedMember mbr = mbrs.get(index);
+          if (mbr.equals(localAddress)) {
+            break;
+          }
+          if (mbr.equals(coordinator)) {
+            continue;
+          }
+          HeartbeatMessage message = new HeartbeatMessage(-1);
+          message.setRecipient(mbr);
           try {
-            socket.close();
-          } catch (IOException e) {
-            logger.info("Unexpected exception", e);
+            services.getMessenger().sendUnreliably(message);
+            numSent++;
+            if (numSent >= NUM_HEARTBEATS) {
+              break;
+            }
+          } catch (CancelException e) {
+            return;
           }
         }
-      }
-    }
+      } // for (;;)
+    });
   }
 
   public synchronized void installView(NetView newView) {
     synchronized (viewVsSuspectedMembers) {
       viewVsSuspectedMembers.clear();
     }
-    currentSuspects.removeAll(newView.getCrashedMembers());
-    currentSuspects.removeAll(newView.getShutdownMembers());
+    for (Iterator<InternalDistributedMember> it=memberTimeStamps.keySet().iterator(); it.hasNext(); ) {
+      if (!newView.contains(it.next())) {
+        it.remove();
+      }
+    }
+    for (Iterator<InternalDistributedMember> it=suspectedMemberInView.keySet().iterator(); it.hasNext(); ) {
+      if (!newView.contains(it.next())) {
+        it.remove();
+      }
+    }
+//    for (InternalDistributedMember mbr: newView.getMembers()) {
+//      if (!memberVsLastMsgTS.containsKey(mbr)) {
+//        CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis());
+//        memberVsLastMsgTS.put(mbr, customTS);
+//      }
+//    }
     currentView = newView;
     setNextNeighbor(newView, null);
   }
@@ -640,21 +753,17 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * member next to suspect member.
    */
   private synchronized void setNextNeighbor(NetView newView, InternalDistributedMember nextTo) {
+    if (newView == null) {
+      return;
+    }
     if (nextTo == null) {
       nextTo = localAddress;
     }
-    boolean sameView = false;
-
-    if (currentView != null &&
-        newView.getCreator().equals(currentView.getCreator()) &&
-        newView.getViewId() == currentView.getViewId()) {
-      sameView = true;
-    }
 
     List<InternalDistributedMember> allMembers = newView.getMembers();
     
     Set<?> checkAllSuspected = new HashSet<>(allMembers);
-    checkAllSuspected.removeAll(currentSuspects);
+    checkAllSuspected.removeAll(suspectedMemberInView.keySet());
     checkAllSuspected.remove(localAddress);
     if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
       logger.info("All other members are suspect at this point");
@@ -666,7 +775,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     if (index != -1) {
       int nextNeighborIndex = (index + 1) % allMembers.size();
       InternalDistributedMember newNeighbor = allMembers.get(nextNeighborIndex);
-      if (currentSuspects.contains(newNeighbor)) {
+      if (suspectedMemberInView.containsKey(newNeighbor)) {
         setNextNeighbor(newView, newNeighbor);
         return;
       }
@@ -681,18 +790,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       nextNeighbor = null;
     }
 
-    if (!sameView || memberVsLastMsgTS.size() == 0) {
-      
-      if (memberVsLastMsgTS.size() > 0) {
-        memberVsLastMsgTS.clear();
-      }
-
-      long cts = System.currentTimeMillis();
-      for (InternalDistributedMember mbr: allMembers) {
-        CustomTimeStamp customTS = new CustomTimeStamp(cts);
-        memberVsLastMsgTS.put(mbr, customTS);
-      }
-    }
   }
 
   /*** test method */
@@ -702,16 +799,19 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
   @Override
   public void init(Services s) {
+    isStopping = false;
     services = s;
     memberTimeout = s.getConfig().getMemberTimeout();
-    services.getMessenger().addHandler(CheckRequestMessage.class, this);
-    services.getMessenger().addHandler(CheckResponseMessage.class, this);
+    services.getMessenger().addHandler(HeartbeatRequestMessage.class, this);
+    services.getMessenger().addHandler(HeartbeatMessage.class, this);
     services.getMessenger().addHandler(SuspectMembersMessage.class, this);
   }
 
   @Override
   public void started() {
     this.localAddress = services.getMessenger().getMemberID();
+    startTcpServer();
+    startHeartbeatThread();
   }
 
   @Override
@@ -753,19 +853,31 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         }
       }      
       serverSocketExecutor.shutdownNow();
+      try {
+        serverSocketExecutor.awaitTermination(2000, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
       logger.info("GMSHealthMonitor serverSocketExecutor is " + (serverSocketExecutor.isTerminated() ? "terminated" : "not terminated"));
     }
     
-    if (suspectRequestCollectorThread != null) {
-      suspectRequestCollectorThread.shutdown();
-    }
+//    if (suspectRequestCollectorThread != null) {
+//      suspectRequestCollectorThread.shutdown();
+//    }
   }
 
   /***
    * test method
    */
   public boolean isShutdown() {
-    return scheduler.isShutdown() && checkExecutor.isShutdown() && serverSocketExecutor.isShutdown() && !suspectRequestCollectorThread.isAlive();
+    return scheduler.isShutdown() && checkExecutor.isShutdown() && serverSocketExecutor.isShutdown() /*&& !suspectRequestCollectorThread.isAlive()*/;
+  }
+
+  /**
+   * Test method - check to see if a member is under suspicion
+   */
+  public boolean isSuspectMember(InternalDistributedMember m) {
+    return this.suspectedMemberInView.containsKey(m);
   }
 
   @Override
@@ -785,8 +897,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
   @Override
   public void playDead() {
-    sendSuspectMessage(localAddress, "playDead invoked on GMSHealthMonitor");
     this.playingDead = true;
+    sendSuspectMessage(localAddress, "playDead invoked on GMSHealthMonitor");
   }
 
   @Override
@@ -806,21 +918,21 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       return;
     }
 
-    logger.trace("HealthMonitor processing {}", m);
+    logger.trace("processing {}", m);
 
     switch (m.getDSFID()) {
-    case CHECK_REQUEST:
+    case HEARTBEAT_REQUEST:
       if (beingSick || playingDead) {
         logger.debug("sick member is ignoring check request");
       } else {
-        processCheckRequest((CheckRequestMessage) m);
+        processHeartbeatRequest((HeartbeatRequestMessage) m);
       }
       break;
-    case CHECK_RESPONSE:
+    case HEARTBEAT_RESPONSE:
       if (beingSick || playingDead) {
         logger.debug("sick member is ignoring check response");
       } else {
-        processCheckResponse((CheckResponseMessage) m);
+        processHeartbeat((HeartbeatMessage) m);
       }
       break;
     case SUSPECT_MEMBERS_MESSAGE:
@@ -835,7 +947,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     }
   }
 
-  private void processCheckRequest(CheckRequestMessage m) {
+  private void processHeartbeatRequest(HeartbeatRequestMessage m) {
     
     if (this.isStopping || this.playingDead) {
       return;
@@ -848,24 +960,29 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     // membership port as it had in its last incarnation, causing
     // delays in removing the old member ID from the view.
     if (me.getVmViewId() < 0 || m.getTarget().equals(me)) {
-      CheckResponseMessage prm = new CheckResponseMessage(m.getRequestId());
+      HeartbeatMessage prm = new HeartbeatMessage(m.getRequestId());
       prm.setRecipient(m.getSender());
       Set<InternalDistributedMember> membersNotReceivedMsg = services.getMessenger().send(prm);
       if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(m.getSender())) {
-        logger.debug("Unable to send check response to member: {}", m.getSender());
+        logger.debug("Unable to send heartbeat to member: {}", m.getSender());
       }
     } else {
-      logger.debug("Ignoring check request intended for {}.  My ID is {}", m.getTarget(), me);
+      logger.debug("Ignoring heartbeat request intended for {}.  My ID is {}", m.getTarget(), me);
     }
   }
 
-  private void processCheckResponse(CheckResponseMessage m) {
-    Response resp = requestIdVsResponse.get(m.getRequestId());
-    logger.trace("Got check response from member {}. {}", m.getSender(), (resp != null ? "Check Thread still waiting" : "Check thread is not waiting"));
-    if (resp != null) {
-      synchronized (resp) {
-        resp.setResponseMsg(m);
-        resp.notify();
+  private void processHeartbeat(HeartbeatMessage m) {
+    if (m.getRequestId() < 0) {
+      // a periodic heartbeat
+      contactedBy(m.getSender());
+    } else {
+      Response resp = requestIdVsResponse.get(m.getRequestId());
+      logger.trace("Got heartbeat from member {}. {}", m.getSender(), (resp != null ? "Check thread still waiting" : "Check thread is not waiting"));
+      if (resp != null) {
+        synchronized (resp) {
+          resp.setResponseMsg(m);
+          resp.notify();
+        }
       }
     }
   }
@@ -895,6 +1012,24 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       return;
     }
 
+    // take care of any suspicion of this member by sending a heartbeat back
+    if (!playingDead) {
+      for (Iterator<SuspectRequest> it = incomingRequest.getMembers().iterator(); it.hasNext(); ) {
+        SuspectRequest req = it.next();
+        if (req.getSuspectMember().equals(localAddress)) {
+          HeartbeatMessage message = new HeartbeatMessage(-1);
+          message.setRecipient(sender);
+          try {
+            services.getMessenger().send(message);
+            it.remove();
+          } catch (CancelException e) {
+            return;
+          }
+        }
+      }
+    }
+
+    
     if (cv.getCoordinator().equals(localAddress)) {
       for (SuspectRequest req: incomingRequest.getMembers()) {
         logger.info("received suspect message from {} for {}: {}",
@@ -948,9 +1083,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         viewVsSuspectedMembers.put(cv, viewVsMembers);
       }
       for (SuspectRequest sr: sMembers) {       
-        if (!viewVsMembers.contains(sr)) {
-          viewVsMembers.add(sr);
-        }
+        viewVsMembers.add(sr);
       }
     }
   }
@@ -969,10 +1102,15 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         continue;// self
       }
 
-      NetView view;
-      view = suspectedMemberVsView.putIfAbsent(mbr, cv);
+      // suspectMemberInView is now set by the heartbeat monitoring code
+      // to allow us to move on from watching members we've already
+      // suspected.  Since that code is updating this collection we
+      // cannot use it here as an indication that a member is currently
+      // undergoing a final check.
+//      NetView view;
+//      view = suspectedMemberInView.putIfAbsent(mbr, cv);
 
-      if (view == null || !view.equals(cv)) {
+//      if (view == null || !view.equals(cv)) {
         final String reason = sr.getReason();
         logger.debug("Scheduling final check for member {}; reason={}", mbr, reason);
         // its a coordinator
@@ -983,8 +1121,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
             try {
               services.memberSuspected(initiator, mbr);
               long startTime = System.currentTimeMillis();
-              CustomTimeStamp ts = new CustomTimeStamp(startTime);
-              memberVsLastMsgTS.put(mbr, ts);
+              // for some reason we used to update the timestamp for the member
+              // with the startTime, but we don't want to do that because it looks
+              // like a heartbeat has been received
 
               logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
               boolean pinged;
@@ -999,27 +1138,35 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
               } else {
                 pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
               }
-              logger.info("Final check {}", pinged? "succeeded" : "failed");
               
+              boolean failed = false;
               if (!pinged && !isStopping) {
-                ts = memberVsLastMsgTS.get(mbr);
-                if (ts == null || ts.getTimeStamp() <= startTime) {
+                TimeStamp ts = memberTimeStamps.get(mbr);
+                if (ts == null || ts.getTime() <= startTime) {
+                  logger.info("Final check failed - requesting removal");
                   services.getJoinLeave().remove(mbr, reason);
+                  failed = true;
+                } else {
+                  logger.info("check failed but detected recent message traffic");
                 }
               }
+              if (!failed) {
+                logger.info("Final check passed");
+              }
               // whether it's alive or not, at this point we allow it to
               // be watched again
+              suspectedMemberInView.remove(mbr);
               contactedBy(mbr);
             } catch (DistributedSystemDisconnectedException e) {
               return;
             } catch (Exception e) {
               logger.info("Unexpected exception while verifying member", e);
             } finally {
-              GMSHealthMonitor.this.suspectedMemberVsView.remove(mbr);
+              GMSHealthMonitor.this.suspectedMemberInView.remove(mbr);
             }
           }
         });
-      }// scheduling for final check and removing it..
+//      }// scheduling for final check and removing it..
     }
   }
 
@@ -1104,7 +1251,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           } // synchronized
           if (requests != null && !requests.isEmpty()) {
             if (logger != null && logger.isDebugEnabled()) {
-              logger.debug("Health Monitor is sending {} member suspect requests to coordinator", requests.size());
+              logger.info("Health Monitor is sending {} member suspect requests to coordinator", requests.size());
             }
             callback.process(requests);
             requests = null;
@@ -1118,22 +1265,23 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   private void sendSuspectRequest(final List<SuspectRequest> requests) {
+    // the background suspect-collector thread is currently disabled
+//    synchronized (suspectRequests) {
+//      if (suspectRequests.size() > 0) {
+//        for (SuspectRequest sr: suspectRequests) {
+//          if (!requests.contains(sr)) {
+//            requests.add(sr);
+//          }
+//        }
+//        suspectRequests.clear();
+//      }
+//    }
     logger.debug("Sending suspect request for members {}", requests);
-    synchronized (suspectRequests) {
-      if (suspectRequests.size() > 0) {
-        for (SuspectRequest sr: suspectRequests) {
-          if (!requests.contains(sr)) {
-            requests.add(sr);
-          }
-        }
-        suspectRequests.clear();
-      }
-    }
     List<InternalDistributedMember> recipients;
+//  TODO this needs some rethinking - we need the guys near the
+//  front of the membership view who aren't preferred for coordinator
+//  to see the suspect message.
 //    if (v.size() > 20) {
-//      // TODO this needs some rethinking - we need the guys near the
-//      // front of the membership view who aren't preferred for coordinator
-//      // to see the suspect message.
 //      HashSet<InternalDistributedMember> filter = new HashSet<InternalDistributedMember>();
 //      for (int i = 0; i < requests.size(); i++) {
 //        filter.add(requests.get(i).getSuspectMember());
@@ -1147,7 +1295,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     Set<InternalDistributedMember> failedRecipients;
     try {
       failedRecipients = services.getMessenger().send(rmm);
-    } catch (DistributedSystemDisconnectedException e) {
+    } catch (CancelException e) {
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 0b2abe3..ed5535f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -13,6 +13,7 @@ import static com.gemstone.gemfire.internal.DataSerializableFixedID.VIEW_ACK_MES
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -362,9 +363,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         logger.info("received join response with no membership view: {}", response);
       }
     } else {
-      logger.debug("received no join response");
+      if (!isJoined) {
+        logger.debug("received no join response");
+      }
     }
-    return false;
+    return isJoined;
   }
 
   /**
@@ -422,6 +425,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     logger.info("received leave request from {} for {}", incomingRequest.getSender(), incomingRequest.getMemberID());
 
     NetView v = currentView;
+    if (v == null) {
+      recordViewRequest(incomingRequest);
+      return;
+    }
+    
+    
     InternalDistributedMember mbr = incomingRequest.getMemberID();
 
     if (logger.isDebugEnabled()) {
@@ -593,11 +602,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     if (currentView == null) {
       // create the initial membership view
       NetView newView = new NetView(this.localAddress);
+      newView.setFailureDetectionPort(localAddress, services.getHealthMonitor().getFailureDetectionPort());
       this.localAddress.setVmViewId(0);
       installView(newView);
       isJoined = true;
       if (viewCreator == null || viewCreator.isShutdown()) {
-        viewCreator = new ViewCreator("GemFire Membership View Creator", Services.getThreadGroup());
+        viewCreator = new ViewCreator("Geode Membership View Creator", Services.getThreadGroup());
         viewCreator.setDaemon(true);
         viewCreator.start();
         startViewBroadcaster();
@@ -633,7 +643,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         newView.setFailureDetectionPort(this.localAddress, services.getHealthMonitor().getFailureDetectionPort());
       }
       if (viewCreator == null || viewCreator.isShutdown()) {
-        viewCreator = new ViewCreator("GemFire Membership View Creator", Services.getThreadGroup());
+        viewCreator = new ViewCreator("Geode Membership View Creator", Services.getThreadGroup());
         viewCreator.setInitialView(newView, leaving, removals);
         viewCreator.setDaemon(true);
         viewCreator.start();
@@ -691,7 +701,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       return true;
     }
 
-    logger.info((preparing ? "preparing" : "sending") + " new view " + view);
+    StringBuilder s = new StringBuilder();
+    int[] ports = view.getFailureDetectionPorts();
+    int numMembers = view.size();
+    for (int i=0; i<numMembers; i++) {
+      if (i > 0) {
+        s.append(' ');
+      }
+      s.append(ports[i]);
+    }
+    logger.info((preparing ? "preparing" : "sending") + " new view " + view
+        + "\nfailure detection ports: " + s.toString());
 
     msg.setRecipients(recips);
 
@@ -736,8 +756,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
   private void processViewMessage(InstallViewMessage m) {
 
-    logger.debug("Membership: processing {}", m);
-
     NetView view = m.getView();
 
     if (currentView != null && view.getViewId() < currentView.getViewId()) {
@@ -752,6 +770,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       } else {
         this.preparedView = view;
         ackView(m);
+        if (!this.isJoined) {
+          // if we're still waiting for a join response and we're in this view we
+          // should install the view so join() can finish its work
+          for (InternalDistributedMember mbr: view.getMembers()) {
+            if (localAddress.compareTo(mbr) == 0) {
+              installView(view);
+              break;
+            }
+          }
+        }
       }
     } else { // !preparing
       if (currentView != null && !view.contains(this.localAddress)) {
@@ -759,7 +787,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           forceDisconnect("This node is no longer in the membership view");
         }
       } else {
-        ackView(m);
+        if (!m.isRebroadcast()) { // no need to ack a rebroadcast view
+          ackView(m);
+        }
         installView(view);
       }
     }
@@ -1057,13 +1087,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
             GMSMember me = (GMSMember) this.localAddress.getNetMember();
             me.setBirthViewId(birthViewId);
             me.setSplitBrainEnabled(mbr.getNetMember().splitBrainEnabled());
-            isJoined = true;
             break;
           }
         }
       }
 
-      if (isNetworkPartition(newView)) {
+      if (isJoined && isNetworkPartition(newView)) {
         if (quorumRequired) {
           Set<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
           forceDisconnect(LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
@@ -1076,6 +1105,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       lastConflictingView = null;
       services.installView(newView);
 
+      isJoined = true;
+      synchronized(joinResponse) {
+        joinResponse.notify();
+      }
+
       if (!newView.getCreator().equals(this.localAddress)) {
         if (newView.shouldBeCoordinator(this.localAddress)) {
           becomeCoordinator();
@@ -1357,7 +1391,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     if (isStopping) {
       return;
     }
-    logger.debug("JoinLeave processing {}", m);
+    logger.debug("processing {}", m);
     switch (m.getDSFID()) {
     case JOIN_REQUEST:
       processJoinRequest((JoinRequestMessage) m);
@@ -1572,9 +1606,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         InstallViewMessage msg = new InstallViewMessage(v, services.getAuthenticator().getCredentials(localAddress));
         Collection<InternalDistributedMember> recips = new ArrayList<>(v.size() + v.getCrashedMembers().size());
         recips.addAll(v.getMembers());
+        recips.remove(localAddress);
         recips.addAll(v.getCrashedMembers());
         msg.setRecipients(recips);
-        services.getMessenger().send(msg);
+        // use sendUnreliably since we are sending to crashed members &
+        // don't want any retransmission tasks set up for them
+        services.getMessenger().sendUnreliably(msg);
       }
     }
     
@@ -1669,7 +1706,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
               if (System.currentTimeMillis() < okayToCreateView) {
                 // sleep to let more requests arrive
                 try {
-                  sleep(100);
+                  viewRequests.wait(100);
                   continue;
                 } catch (InterruptedException e) {
                   return;
@@ -1828,9 +1865,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
             }
           }
         }
-        if (logger.isDebugEnabled()) {
-          logger.debug("Established failure detection ports for new view: {}", newView.getFailureDetectionPorts());
-        }
       }
 
       // if there are no membership changes then abort creation of
@@ -1935,7 +1969,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           removalReqs.addAll(failures);
           List<InternalDistributedMember> newMembers = new ArrayList<>(newView.getMembers());
           newMembers.removeAll(removalReqs);
-          newView = new NetView(localAddress, newView.getViewId() + 1, newMembers, leaveReqs, removalReqs);
+          NetView nextView = new NetView(localAddress, newView.getViewId() + 1, newMembers, leaveReqs, removalReqs);
+          for (InternalDistributedMember mbr: newView.getMembers()) {
+            nextView.setFailureDetectionPort(mbr, newView.getFailureDetectionPort(mbr));
+          }
           int size = failures.size();
           List<String> reasons = new ArrayList<>(size);
           for (int i=0; i<size; i++) {
@@ -2025,7 +2062,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         @Override
         public Thread newThread(Runnable r) {
           return new Thread(Services.getThreadGroup(), r,
-              "GemFire View Creator verification thread " + i.incrementAndGet());
+              "Geode View Creator verification thread " + i.incrementAndGet());
         }
       });
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/CheckRequestMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/CheckRequestMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/CheckRequestMessage.java
deleted file mode 100755
index 75f6b6e..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/CheckRequestMessage.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.distributed.internal.DistributionManager;
-import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.Version;
-
-public class CheckRequestMessage extends HighPriorityDistributionMessage{
-
-  int requestId;
-  InternalDistributedMember target;
-  
-  public CheckRequestMessage(InternalDistributedMember neighbour, int id) {
-    requestId = id;
-    this.target = neighbour;
-  }
-  
-  public CheckRequestMessage(){}
-  
-  public InternalDistributedMember getTarget() {
-    return target;
-  }
-  
-  @Override
-  public int getDSFID() {
-    return CHECK_REQUEST;
-  }
-
-  @Override
-  protected void process(DistributionManager dm) {
-    throw new IllegalStateException("this message is not intended to execute in a thread pool");
-  }   
-
-  @Override
-  public String toString() {
-    return "CheckRequestMessage [requestId=" + requestId + "]";
-  }
-
-  public int getRequestId() {
-    return requestId;
-  }
-
-  @Override
-  public Version[] getSerializationVersions() {
-    return null;
-  }  
-  
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    out.writeInt(requestId);
-    DataSerializer.writeObject(target, out);
-  }
-  
-  @Override
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    requestId = in.readInt();
-    target = DataSerializer.readObject(in);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/CheckResponseMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/CheckResponseMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/CheckResponseMessage.java
deleted file mode 100755
index b6f3735..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/CheckResponseMessage.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.distributed.internal.DistributionManager;
-import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
-import com.gemstone.gemfire.internal.Version;
-
-public class CheckResponseMessage extends HighPriorityDistributionMessage {
-  int requestId;
-  
-  public CheckResponseMessage(int id) {
-    requestId = id;
-  }
-
-  public CheckResponseMessage(){}
-  
-  public int getRequestId() {
-    return requestId;
-  }
-
-
-  @Override
-  public int getDSFID() {
-    return CHECK_RESPONSE;
-  }
-
-  @Override
-  protected void process(DistributionManager dm) {
-    throw new IllegalStateException("this message is not intended to execute in a thread pool");
-  }
- 
-  @Override
-  public String toString() {
-    return "CheckResponseMessage [requestId=" + requestId + "]";
-  }
-
-  @Override
-  public Version[] getSerializationVersions() {
-    return null;
-  }  
-
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    out.writeInt(requestId);
-  }
-  
-  @Override
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    requestId = in.readInt();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatMessage.java
new file mode 100755
index 0000000..a215ffc
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatMessage.java
@@ -0,0 +1,58 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
+import com.gemstone.gemfire.internal.Version;
+
+public class HeartbeatMessage extends HighPriorityDistributionMessage {
+  /**
+   * RequestId identifies the HeartbeatRequestMessage for which this is a response.
+   * If it is < 0 this is a periodic heartbeat message.
+   */
+  int requestId;
+  
+  public HeartbeatMessage(int id) {
+    requestId = id;
+  }
+
+  public HeartbeatMessage(){}
+  
+  public int getRequestId() {
+    return requestId;
+  }
+
+
+  @Override
+  public int getDSFID() {
+    return HEARTBEAT_RESPONSE;
+  }
+
+  @Override
+  protected void process(DistributionManager dm) {
+    throw new IllegalStateException("this message is not intended to execute in a thread pool");
+  }
+ 
+  @Override
+  public String toString() {
+    return getClass().getSimpleName()+" [requestId=" + requestId + "]";
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }  
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(requestId);
+  }
+  
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    requestId = in.readInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
new file mode 100755
index 0000000..a54b11a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java
@@ -0,0 +1,64 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Version;
+
+public class HeartbeatRequestMessage extends HighPriorityDistributionMessage{
+
+  int requestId;
+  InternalDistributedMember target;
+  
+  public HeartbeatRequestMessage(InternalDistributedMember neighbour, int id) {
+    requestId = id;
+    this.target = neighbour;
+  }
+  
+  public HeartbeatRequestMessage(){}
+  
+  public InternalDistributedMember getTarget() {
+    return target;
+  }
+  
+  @Override
+  public int getDSFID() {
+    return HEARTBEAT_REQUEST;
+  }
+
+  @Override
+  protected void process(DistributionManager dm) {
+    throw new IllegalStateException("this message is not intended to execute in a thread pool");
+  }   
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName()+" [requestId=" + requestId + "]";
+  }
+
+  public int getRequestId() {
+    return requestId;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }  
+  
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(requestId);
+    DataSerializer.writeObject(target, out);
+  }
+  
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    requestId = in.readInt();
+    target = DataSerializer.readObject(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
index a5be893..63aceb9 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java
@@ -15,25 +15,32 @@ import com.gemstone.gemfire.internal.InternalDataSerializer;
 
 public class InstallViewMessage extends HighPriorityDistributionMessage {
 
+  enum messageType {
+    INSTALL, PREPARE, SYNC
+  }
   private NetView view;
   private Object credentials;
-  private boolean preparing;
+  private messageType kind;
 
   public InstallViewMessage(NetView view, Object credentials) {
     this.view = view;
-    this.preparing = false;
+    this.kind = messageType.INSTALL;
     this.credentials = credentials;
   }
 
   public InstallViewMessage(NetView view, Object credentials, boolean preparing) {
     this.view = view;
-    this.preparing = preparing;
+    this.kind = preparing? messageType.PREPARE : messageType.INSTALL;
     this.credentials = credentials;
   }
   
   public InstallViewMessage() {
     // no-arg constructor for serialization
   }
+  
+  public boolean isRebroadcast() {
+    return kind == messageType.SYNC;
+  }
 
   public NetView getView() {
     return view;
@@ -44,7 +51,7 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
   }
 
   public boolean isPreparing() {
-    return preparing;
+    return kind == messageType.PREPARE;
   }
 
   @Override
@@ -60,22 +67,22 @@ public class InstallViewMessage extends HighPriorityDistributionMessage {
   @Override
   public void toData(DataOutput out) throws IOException {
     super.toData(out);
+    out.writeInt(kind.ordinal());
     DataSerializer.writeObject(this.view, out);
     DataSerializer.writeObject(this.credentials, out);
-    out.writeBoolean(preparing);
   }
 
   @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
+    this.kind = messageType.values()[in.readInt()];
     this.view = DataSerializer.readObject(in);
     this.credentials = DataSerializer.readObject(in);
-    this.preparing = in.readBoolean();
   }
 
   @Override
   public String toString() {
-    return "InstallViewMessage(preparing="+this.preparing+"; "+this.view
+    return "InstallViewMessage(type="+this.kind+"; "+this.view
             +"; cred="+(credentials==null?"null": "not null")
              +")";
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectRequest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectRequest.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectRequest.java
index a553d39..e90d7c8 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectRequest.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectRequest.java
@@ -51,6 +51,6 @@ public class SuspectRequest {
 
   @Override
   public String toString() {
-    return "SuspectRequest [suspectMemebr=" + suspectMember + ", reason=" + reason + "]";
+    return "SuspectRequest [member=" + suspectMember + ", reason=" + reason + "]";
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index bd21629..65d6f05 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -246,8 +246,6 @@ public class JGroupsMessenger implements Messenger {
     // create the configuration XML string for JGroups
     String properties = this.jgStackConfig;
     
-    logger.debug("JGroups configuration: {}", properties);
-    
     long start = System.currentTimeMillis();
     
     // start the jgroups channel and establish the membership ID
@@ -255,10 +253,22 @@ public class JGroupsMessenger implements Messenger {
     try {
       Object oldChannel = services.getConfig().getTransport().getOldDSMembershipInfo();
       if (oldChannel != null) {
+        logger.debug("Reusing JGroups channel from previous system", properties);
+        
         myChannel = (JChannel)oldChannel;
+        // scrub the old channel
+        ViewId vid = new ViewId(new JGAddress(), 0);
+        View jgv = new View(vid, new ArrayList<Address>());
+        this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv));
+        UUID logicalAddress = (UUID)myChannel.getAddress();
+        if (logicalAddress instanceof JGAddress) {
+          ((JGAddress)logicalAddress).setVmViewId(-1);
+        }
         reconnecting = true;
       }
       else {
+        logger.debug("JGroups configuration: {}", properties);
+        
         checkForWindowsIPv6();
         InputStream is = new ByteArrayInputStream(properties.getBytes("UTF-8"));
         myChannel = new JChannel(is);
@@ -291,7 +301,7 @@ public class JGroupsMessenger implements Messenger {
     
     establishLocalAddress();
     
-    logger.info("JGroups channel created (took {}ms)", System.currentTimeMillis()-start);
+    logger.info("JGroups channel {} (took {}ms)", (reconnecting? "reinitialized" : "created"), System.currentTimeMillis()-start);
     
   }
   
@@ -492,7 +502,7 @@ public class JGroupsMessenger implements Messenger {
           && (msg.getMulticast() || allDestinations);
     }
     
-    if (logger.isDebugEnabled()) {
+    if (logger.isDebugEnabled() && reliably) {
       String recips = "multicast";
       if (!useMcast) {
         recips = Arrays.toString(msg.getRecipients());
@@ -728,10 +738,10 @@ public class JGroupsMessenger implements Messenger {
     InternalDistributedMember sender = null;
 
     Exception problem = null;
+    byte[] buf = jgmsg.getRawBuffer();
     try {
       long start = services.getStatistics().startMsgDeserialization();
       
-      byte[] buf = jgmsg.getRawBuffer();
       DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf, 
           jgmsg.getOffset(), jgmsg.getLength()));
 
@@ -751,9 +761,6 @@ public class JGroupsMessenger implements Messenger {
       }
       
       services.getStatistics().endMsgDeserialization(start);
-
-      logger.trace("JGroupsReceiver deserialized {}", result);
-
     }
     catch (ClassNotFoundException | IOException | RuntimeException e) {
       problem = e;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3034be6/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
index 7431fe7..c472e06 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/StatRecorder.java
@@ -2,6 +2,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.messenger;
 
 import org.apache.logging.log4j.Logger;
 import org.jgroups.Event;
+import org.jgroups.Header;
 import org.jgroups.Message;
 import org.jgroups.conf.ClassConfigurator;
 import org.jgroups.protocols.FRAG2;
@@ -12,6 +13,7 @@ import org.jgroups.protocols.pbcast.NakAckHeader2;
 import org.jgroups.stack.Protocol;
 
 import com.gemstone.gemfire.distributed.internal.DMStats;
+import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil;
 import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 
 /**
@@ -113,10 +115,32 @@ public class StatRecorder extends Protocol {
   }
   
   private void filter(Message msg, int direction) {
-    FragHeader hdr = (FragHeader)msg.getHeader(frag2HeaderId);
-    if (hdr != null) {
-      String str = direction == OUTGOING? "sending" : "receiving";
-      logger.debug("{} fragment {} msg offset {} msg size {}", str, hdr, msg.getOffset(), msg.getLength());
+    if (direction == INCOMING) {
+      Header h = msg.getHeader(frag2HeaderId);
+      boolean copyBuffer = false;
+      if (h != null && h instanceof FragHeader) {
+        copyBuffer = true;
+//      String str = direction == OUTGOING? "sending" : "receiving";
+//      logger.debug("{} fragment {} msg buffer hash {}  offset {} msg size {} first bytes=\n{}", str, hdr, 
+//          msg.getRawBuffer().hashCode(), msg.getOffset(), msg.getLength(),
+//          GMSUtil.formatBytes(msg.getRawBuffer(), msg.getOffset(),
+//              Math.min(200, msg.getLength())));
+      } else {
+        h = msg.getHeader(unicastHeaderId);
+        if (h instanceof UNICAST3.Header) {
+          copyBuffer = true;
+        } else {
+          h = msg.getHeader(nakackHeaderId);
+          if (h instanceof NakAckHeader2) {
+            copyBuffer = true;
+          }
+        }
+      }
+      if (copyBuffer) {
+        // JGroups doesn't copy its message buffer when thread pools are
+        // disabled.  This causes Frag2 fragments to become corrupted
+        msg.setBuffer(msg.getBuffer(), 0, msg.getLength());
+      }
     }
   }
 }



Mime
View raw message