geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hiteshkhame...@apache.org
Subject incubator-geode git commit: first cut for HealthMonitor
Date Fri, 21 Aug 2015 22:35:21 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-77 8b2ea77d2 -> 8a9294a3c


first cut for HealthMonitor


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8a9294a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8a9294a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8a9294a3

Branch: refs/heads/feature/GEODE-77
Commit: 8a9294a3c6c7c99c7df0f5224358e1b3be321a30
Parents: 8b2ea77
Author: Hitesh Khamesra <hkhamesra@pivotal.io>
Authored: Mon Aug 10 13:41:21 2015 -0700
Committer: Hitesh Khamesra <hkhamesra@pivotal.io>
Committed: Fri Aug 21 15:35:00 2015 -0700

----------------------------------------------------------------------
 .../internal/membership/NetView.java            | 161 ++--
 .../membership/gms/fd/GMSHealthMonitor.java     | 874 ++++++++++++++++---
 .../membership/gms/membership/GMSJoinLeave.java |   5 +-
 .../gms/messages/PingRequestMessage.java        |  56 ++
 .../gms/messages/PingResponseMessage.java       |  54 ++
 .../gms/messages/RemoveMemberMessage.java       |   9 +
 .../gms/messages/SuspectMembersMessage.java     |  76 ++
 .../membership/gms/messages/SuspectRequest.java |  56 ++
 .../gemstone/gemfire/internal/DSFIDFactory.java |   6 +
 .../internal/DataSerializableFixedID.java       |   6 +
 .../membership/GMSHealthMonitorJUnitTest.java   | 439 ++++++++++
 11 files changed, 1586 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
index 2a8f248..7b86159 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
@@ -13,26 +13,31 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMemberFactory;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
+import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
 import com.gemstone.gemfire.internal.DataSerializableFixedID;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.Version;
 
-
 /**
- * The NetView class represents a membership view.  Note that 
+ * The NetView class represents a membership view. Note that
  * this class is not synchronized, so take that under advisement
  * if you decide to modify a view with add() or remove().
  * 
- * @since 5.5 
+ * @since 5.5
  */
 public class NetView implements DataSerializableFixedID {
   private static final long serialVersionUID = -8888347937416039434L;
@@ -42,7 +47,12 @@ public class NetView implements DataSerializableFixedID {
   private List<InternalDistributedMember> crashedMembers;
   private InternalDistributedMember creator;
   private Set<InternalDistributedMember> hashedMembers;
-  
+  static final private Random rd = new Random();
+
+  // TODO:need to clear this
+  /** membership logger */
+  private static final Logger logger = Services.getLogger();
+
   public NetView() {
     viewId = 0;
     members = new ArrayList<InternalDistributedMember>(4);
@@ -60,11 +70,12 @@ public class NetView implements DataSerializableFixedID {
     shutdownMembers = Collections.EMPTY_LIST;
     crashedMembers = Collections.EMPTY_LIST;
     this.creator = creator;
+    int seed = creator.hashCode() + (int) System.currentTimeMillis();
   }
 
   // legacy method for JGMM
   public NetView(int size, long viewId) {
-    this.viewId = (int)viewId;
+    this.viewId = (int) viewId;
     members = new ArrayList<InternalDistributedMember>(size);
     this.hashedMembers = new HashSet<InternalDistributedMember>(members);
     shutdownMembers = Collections.EMPTY_LIST;
@@ -80,10 +91,8 @@ public class NetView implements DataSerializableFixedID {
     this.shutdownMembers = new ArrayList<InternalDistributedMember>(other.shutdownMembers);
     this.crashedMembers = new ArrayList<InternalDistributedMember>(other.crashedMembers);
   }
-  public NetView(InternalDistributedMember creator,
-      int viewId,
-      List<InternalDistributedMember> mbrs,
-      List<InternalDistributedMember> shutdowns,
+
+  public NetView(InternalDistributedMember creator, int viewId, List<InternalDistributedMember> mbrs, List<InternalDistributedMember> shutdowns,
       List<InternalDistributedMember> crashes) {
     this.creator = creator;
     this.viewId = viewId;
@@ -92,15 +101,15 @@ public class NetView implements DataSerializableFixedID {
     this.shutdownMembers = shutdowns;
     this.crashedMembers = crashes;
   }
-  
+
   public int getViewId() {
     return this.viewId;
   }
-  
+
   public InternalDistributedMember getCreator() {
     return this.creator;
   }
-  
+
   public void setCreator(InternalDistributedMember creator) {
     this.creator = creator;
   }
@@ -108,7 +117,7 @@ public class NetView implements DataSerializableFixedID {
   public List<InternalDistributedMember> getMembers() {
     return Collections.unmodifiableList(this.members);
   }
-  
+
   /**
    * return members that are i this view but not the given old view
    */
@@ -123,48 +132,48 @@ public class NetView implements DataSerializableFixedID {
    */
   public List<InternalDistributedMember> getNewMembers() {
     List<InternalDistributedMember> result = new ArrayList<InternalDistributedMember>(5);
-    for (InternalDistributedMember mbr: this.members) {
+    for (InternalDistributedMember mbr : this.members) {
       if (mbr.getVmViewId() == this.viewId) {
         result.add(mbr);
       }
     }
     return result;
   }
-  
+
   public Object get(int i) {
     return this.members.get(i);
   }
-  
+
   public void add(InternalDistributedMember mbr) {
     this.hashedMembers.add(mbr);
     this.members.add(mbr);
   }
-  
+
   public boolean remove(InternalDistributedMember mbr) {
     this.hashedMembers.remove(mbr);
     return this.members.remove(mbr);
   }
-  
+
   public boolean contains(InternalDistributedMember mbr) {
     return this.hashedMembers.contains(mbr);
   }
-  
+
   public int size() {
     return this.members.size();
   }
-  
+
   public InternalDistributedMember getLeadMember() {
-    for (InternalDistributedMember mbr: this.members) {
+    for (InternalDistributedMember mbr : this.members) {
       if (mbr.getVmKind() == DistributionManager.NORMAL_DM_TYPE) {
         return mbr;
       }
     }
     return null;
   }
-  
+
   public InternalDistributedMember getCoordinator() {
-    synchronized(members) {
-      for (InternalDistributedMember addr: members) {
+    synchronized (members) {
+      for (InternalDistributedMember addr : members) {
         if (addr.getNetMember().preferredForCoordinator()) {
           return addr;
         }
@@ -176,14 +185,51 @@ public class NetView implements DataSerializableFixedID {
     return null;
   }
 
+  /***
+   * This functions returns the list of all preferred coordinators.
+   * One random member from list of non-preferred member list. It make
+   * sure that random member is not in suspected Set.
+   * And local member.
+   * 
+   * @param filter Suspect member set.
+   * @param localAddress
+   * @return list of preferred coordinators
+   */
+  public List<InternalDistributedMember> getAllPreferredCoordinators(Set<InternalDistributedMember> filter, InternalDistributedMember localAddress) {
+    List<InternalDistributedMember> results = new ArrayList<InternalDistributedMember>();
+    List<InternalDistributedMember> notPreferredCoordinatorList = new ArrayList<InternalDistributedMember>();
+
+    synchronized (members) {
+      for (InternalDistributedMember addr : members) {
+        if (addr.equals(localAddress)) {
+          continue;// this is must to add
+        }
+        if (addr.getNetMember().preferredForCoordinator()) {
+          results.add(addr);// add all preferred coordinator
+        } else if (!filter.contains(addr)) {
+          notPreferredCoordinatorList.add(addr);
+        }
+      }
+
+      results.add(localAddress);// to add local address
+
+      if (notPreferredCoordinatorList.size() > 0) {
+        int idx = rd.nextInt(notPreferredCoordinatorList.size());
+        results.add(notPreferredCoordinatorList.get(idx)); // to add non preferred local address
+      }
+    }
+
+    return results;
+  }
+
   public List<InternalDistributedMember> getShutdownMembers() {
     return this.shutdownMembers;
   }
-  
+
   public List<InternalDistributedMember> getCrashedMembers() {
     return this.crashedMembers;
   }
-  
+
   /** check to see if the given address is next in line to be coordinator */
   public boolean shouldBeCoordinator(InternalDistributedMember who) {
     Iterator<InternalDistributedMember> it = this.members.iterator();
@@ -196,21 +242,21 @@ public class NetView implements DataSerializableFixedID {
         firstNonPreferred = mbr;
       }
     }
-    return (firstNonPreferred == null  ||  firstNonPreferred.equals(who));
+    return (firstNonPreferred == null || firstNonPreferred.equals(who));
   }
-  
+
   /**
    * returns the weight of the members in this membership view
    */
   public int memberWeight() {
     int result = 0;
     InternalDistributedMember lead = getLeadMember();
-    for (InternalDistributedMember mbr: this.members) {
+    for (InternalDistributedMember mbr : this.members) {
       result += mbr.getNetMember().getMemberWeight();
       switch (mbr.getVmKind()) {
       case DistributionManager.NORMAL_DM_TYPE:
         result += 10;
-        if (lead != null  &&  mbr.equals(lead)) {
+        if (lead != null && mbr.equals(lead)) {
           result += 5;
         }
         break;
@@ -233,15 +279,15 @@ public class NetView implements DataSerializableFixedID {
   public int getCrashedMemberWeight(NetView oldView) {
     int result = 0;
     InternalDistributedMember lead = oldView.getLeadMember();
-    for (InternalDistributedMember mbr: this.crashedMembers) {
-      if ( !oldView.contains(mbr)) {
+    for (InternalDistributedMember mbr : this.crashedMembers) {
+      if (!oldView.contains(mbr)) {
         continue;
       }
       result += mbr.getNetMember().getMemberWeight();
       switch (mbr.getVmKind()) {
       case DistributionManager.NORMAL_DM_TYPE:
         result += 10;
-        if (lead != null  &&  mbr.equals(lead)) {
+        if (lead != null && mbr.equals(lead)) {
           result += 5;
         }
         break;
@@ -257,38 +303,36 @@ public class NetView implements DataSerializableFixedID {
     return result;
   }
 
-  
   /**
    * returns the members of this views crashedMembers collection
-   * that were members of the given view.  Admin-only members are
+   * that were members of the given view. Admin-only members are
    * not counted
    */
   public List<InternalDistributedMember> getActualCrashedMembers(NetView oldView) {
     List<InternalDistributedMember> result = new ArrayList(this.crashedMembers.size());
     InternalDistributedMember lead = oldView.getLeadMember();
-    for (InternalDistributedMember mbr: this.crashedMembers) {
-      if ((mbr.getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE)
-          && oldView.contains(mbr)) {
+    for (InternalDistributedMember mbr : this.crashedMembers) {
+      if ((mbr.getVmKind() != DistributionManager.ADMIN_ONLY_DM_TYPE) && oldView.contains(mbr)) {
         result.add(mbr);
       }
     }
     return result;
   }
-  
+
   /**
    * logs the weight of failed members wrt the given previous
    * view
    */
   public void logCrashedMemberWeights(NetView oldView, Logger log) {
     InternalDistributedMember lead = oldView.getLeadMember();
-    for (InternalDistributedMember mbr: this.crashedMembers) {
-      if ( !oldView.contains(mbr)) {
+    for (InternalDistributedMember mbr : this.crashedMembers) {
+      if (!oldView.contains(mbr)) {
         continue;
       }
       int mbrWeight = mbr.getNetMember().getMemberWeight();
       switch (mbr.getVmKind()) {
       case DistributionManager.NORMAL_DM_TYPE:
-        if (lead != null  &&  mbr.equals(lead)) {
+        if (lead != null && mbr.equals(lead)) {
           mbrWeight += 15;
         } else {
           mbrWeight += 5;
@@ -305,35 +349,38 @@ public class NetView implements DataSerializableFixedID {
       log.info("  " + mbr + " had a weight of " + mbrWeight);
     }
   }
-  
+
   public String toString() {
     InternalDistributedMember lead = getLeadMember();
-    
+
     StringBuilder sb = new StringBuilder(200);
     sb.append("View[").append(creator).append('|').append(viewId).append("] members: [");
     boolean first = true;
-    for (InternalDistributedMember mbr: this.members) {
-      if (!first) sb.append(", ");
+    for (InternalDistributedMember mbr : this.members) {
+      if (!first)
+        sb.append(", ");
       sb.append(mbr);
       if (mbr == lead) {
         sb.append("{lead}");
       }
       first = false;
     }
-    if ( !this.shutdownMembers.isEmpty() ) {
+    if (!this.shutdownMembers.isEmpty()) {
       sb.append("]  shutdown: [");
       first = true;
-      for (InternalDistributedMember mbr: this.shutdownMembers) {
-        if (!first) sb.append(", ");
+      for (InternalDistributedMember mbr : this.shutdownMembers) {
+        if (!first)
+          sb.append(", ");
         sb.append(mbr);
         first = false;
       }
     }
-    if ( !this.crashedMembers.isEmpty() ) {
+    if (!this.crashedMembers.isEmpty()) {
       sb.append("]  crashed: [");
       first = true;
-      for (InternalDistributedMember mbr: this.crashedMembers) {
-        if (!first) sb.append(", ");
+      for (InternalDistributedMember mbr : this.crashedMembers) {
+        if (!first)
+          sb.append(", ");
         sb.append(mbr);
         first = false;
       }
@@ -347,11 +394,12 @@ public class NetView implements DataSerializableFixedID {
     if (arg0 == this) {
       return true;
     }
-    if ( !(arg0 instanceof NetView) ) {
+    if (!(arg0 instanceof NetView)) {
       return false;
     }
-    return this.members.equals(((NetView)arg0).getMembers());
+    return this.members.equals(((NetView) arg0).getMembers());
   }
+
   @Override
   public synchronized int hashCode() {
     return this.members.hashCode();
@@ -375,7 +423,7 @@ public class NetView implements DataSerializableFixedID {
     shutdownMembers = DataSerializer.readArrayList(in);
     crashedMembers = DataSerializer.readArrayList(in);
   }
-  
+
   /** this will deserialize as an ArrayList */
   private void writeAsArrayList(List list, DataOutput out) throws IOException {
     int size;
@@ -386,7 +434,7 @@ public class NetView implements DataSerializableFixedID {
     }
     InternalDataSerializer.writeArrayLength(size, out);
     if (size > 0) {
-      for (int i=0; i < size; i++) {
+      for (int i = 0; i < size; i++) {
         DataSerializer.writeObject(list.get(i), out);
       }
     }
@@ -402,4 +450,3 @@ public class NetView implements DataSerializableFixedID {
     return NETVIEW;
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index db5779b..c10f2b6 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -1,98 +1,776 @@
-package com.gemstone.gemfire.distributed.internal.membership.gms.fd;
-
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.distributed.internal.membership.NetView;
-import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
-
-/** Failure Detection */
-public class GMSHealthMonitor implements HealthMonitor {
-
-  private Services services;
-  private NetView currentView;
-
-  public static void loadEmergencyClasses() {
-  }
-
-  @Override
-  public void contactedBy(InternalDistributedMember sender) {
-    // TODO Auto-generated method stub
-  }
-
-  @Override
-  public void suspect(InternalDistributedMember mbr, String reason) {
-    // TODO Auto-generated method stub
-  }
-
-  @Override
-  public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean initiateRemoval) {
-    // TODO Auto-generated method stub
-    return true;
-  }
-
-  public void playDead(boolean b) {
-    // TODO Auto-generated method stub
-    
-  }
-  
-  public void start() {
-  }
-
-  public void installView(NetView newView) {
-    currentView = newView;
-  }
-
-  @Override
-  public void init(Services s) {
-    // TODO Auto-generated method stub
-    
-  }
-
-  @Override
-  public void started() {
-    // TODO Auto-generated method stub
-    
-  }
-
-  @Override
-  public void stop() {
-    // TODO Auto-generated method stub
-    
-  }
-
-  @Override
-  public void stopped() {
-    // TODO Auto-generated method stub
-    
-  }
-
-  @Override
-  public void beSick() {
-    // TODO Auto-generated method stub
-    
-  }
-
-  @Override
-  public void playDead() {
-    // TODO Auto-generated method stub
-    
-  }
-
-  @Override
-  public void beHealthy() {
-    // TODO Auto-generated method stub
-    
-  }
-
-  @Override
-  public void emergencyClose() {
-    // TODO Auto-generated method stub
-    
-  }
-
-  @Override
-  public void memberShutdown(DistributedMember mbr, String reason) {
-  }
-
-}
+package com.gemstone.gemfire.distributed.internal.membership.gms.fd;
+
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.PING_REQUEST;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.PING_RESPONSE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.PingRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.PingResponseMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
+
+/**
+ * Failure Detection
+ * 
+ * This class make sure that each member is alive and communicating to this member.
+ * To make sure that we create the ring of members based on current view. On this
+ * ring, each member make sure that next-member in ring is communicating with it.
+ * For that we record last message timestamp from next-member. And if it sees this
+ * member has not communicated in last period(member-timeout) then we check whether
+ * this member is still alive or not. Based on that we informed probable coordinators
+ * to remove that member from view.
+ * 
+ * It has {@link #suspect(InternalDistributedMember, String)} api, which can be used
+ * to initiate suspect processing for any member. First is checks whether member is
+ * responding or not. Then it informs to probable coordinators to remove that member from
+ * view.
+ * 
+ * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to see
+ * if that member is alive. Then based on removal flag it initiate the suspect processing
+ * for that member.
+ * 
+ * */
+public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
+
+  private Services services;
+  volatile private NetView currentView;
+  volatile private InternalDistributedMember nextNeighbour;
+
+  long memberTimeout;
+  volatile private boolean isStopping = false;
+  private final AtomicInteger requestId = new AtomicInteger();
+
+  /** membership logger */
+  private static Logger logger = Services.getLogger();
+
+  /**
+   * Member activity will be recorded per interval/period. Timer task will set interval's starting time.
+   * Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL will be configured
+   * via system property. Default will be 10. Atleast 1 interval is needed.
+   */
+  private static final int LOGICAL_INTERVAL = Integer.getInteger("geode.logical-message-received-interval", 10) > 1 ? Integer.getInteger(
+      "geode.logical-message-received-interval", 10) : 10;
+
+  /** stall time to wait for members leaving concurrently */
+  private static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = Long.getLong("geode.suspect-member-collection-interval", 200);
+
+  /**
+   * If member don't see any activity from particular member then it sends check request
+   * to that member. And then it waits for "geode.member-check-timeout" time for response
+   * from it. If that member doesn't respond then it issues suspect request for it. 
+   */
+  private static final long MEMBER_CHECK_TIMEOUT = Long.getLong("geode.member-check-timeout", 100);
+
+  volatile long currentTimeStamp;
+
+  final private Map<InternalDistributedMember, CustomTimeStamp> memberVsLastMsgTS = new ConcurrentHashMap<>();
+  final private Map<Integer, Response> requestIdVsResponse = new ConcurrentHashMap<>();
+  final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberVsView = new ConcurrentHashMap<>();
+  final private Map<NetView, Set<SuspectRequest>> viewVsSuspectedMembers = new HashMap<>();
+
+  private ScheduledExecutorService scheduler;
+
+  private ExecutorService pingExecutor;
+
+  List<SuspectRequest> suspectRequests = new ArrayList<SuspectRequest>();
+  private RequestCollector<SuspectRequest> suspectRequestCollectorThread;
+
+  /**
+   * to stop check scheduler
+   */
+  private ScheduledFuture monitorFuture;
+
+  public GMSHealthMonitor() {
+
+  }
+
+  public static void loadEmergencyClasses() {
+  }
+
+  /*
+   * It records the member activity for current time interval.
+   */
+  @Override
+  public void contactedBy(InternalDistributedMember sender) {
+    CustomTimeStamp cTS = memberVsLastMsgTS.get(sender);
+    if (cTS != null) {
+      cTS.setTimeStamp(currentTimeStamp);
+    }
+  }
+
+  /**
+   * this class is to avoid garbage
+   */
+  private static class CustomTimeStamp {
+    volatile long timeStamp;
+
+    public long getTimeStamp() {
+      return timeStamp;
+    }
+
+    public void setTimeStamp(long timeStamp) {
+      this.timeStamp = timeStamp;
+    }
+  }
+
+  /***
+   * This class sets start interval timestamp to record the activity of all members.
+   * That is used by {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to
+   * record the activity of member.
+   * 
+   * It initiates the suspect processing for next neighbour if it doesn't see any activity from that
+   * member in last interval(member-timeout)
+   */
+  private class Monitor implements Runnable {
+    final long memberTimeoutInMillis;
+
+    public Monitor(long memberTimeout) {
+      memberTimeoutInMillis = memberTimeout;
+    }
+
+    @Override
+    public void run() {
+
+      InternalDistributedMember neighbour = nextNeighbour;
+      if (GMSHealthMonitor.this.isStopping) {
+        return;
+      }
+      long currentTime = System.currentTimeMillis();
+      //this is the start of interval to record member activity
+      GMSHealthMonitor.this.currentTimeStamp = currentTime;
+
+      if (neighbour != null) {
+        CustomTimeStamp nextNeigbourTS = GMSHealthMonitor.this.memberVsLastMsgTS.get(neighbour);
+
+        long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
+        long lastTS = currentTime - nextNeigbourTS.getTimeStamp();
+        if (lastTS + interval >= memberTimeoutInMillis) {
+          logger.debug("Checking member {} ", neighbour);
+          // now do check request for this member;
+          checkMember(neighbour);
+        }
+      }
+    }
+  }
+
+  /***
+   * Check thread waits on this object for response. It puts requestId in requestIdVsResponse map.
+   * Response will have requestId, which is used to get ResponseObject. Then it is used to
+   * notify waiting thread.
+   */
+  private class Response {
+    private DistributionMessage responseMsg;
+
+    public DistributionMessage getResponseMsg() {
+      return responseMsg;
+    }
+
+    public void setResponseMsg(DistributionMessage responseMsg) {
+      this.responseMsg = responseMsg;
+    }
+
+  }
+
+  private PingRequestMessage constructPingRequestMessage(final InternalDistributedMember pingMember) {
+    final int reqId = requestId.getAndIncrement();
+    final PingRequestMessage prm = new PingRequestMessage(reqId);
+    prm.setRecipient(pingMember);
+
+    return prm;
+  }
+
+  private void checkMember(final InternalDistributedMember pingMember) {
+    final NetView cv = GMSHealthMonitor.this.currentView;
+
+    // as ping may take time
+    setNextNeighbour(cv, pingMember);
+
+    // we need to ping this member
+    pingExecutor.execute(new Runnable() {
+
+      @Override
+      public void run() {
+        boolean pinged = GMSHealthMonitor.this.doCheckMember(pingMember);
+        if (!pinged) {
+          String reason = String.format("Member isn't responding to check message: {}", pingMember);
+          GMSHealthMonitor.this.sendSuspectMessage(pingMember, reason);
+        } else {
+          logger.debug("Setting next neighbour as member {} not responded.", pingMember);
+          // back to previous one
+          setNextNeighbour(GMSHealthMonitor.this.currentView, null);
+        }
+      }
+    });
+
+  }
+
+  private void sendSuspectMessage(InternalDistributedMember mbr, String reason) {
+    logger.debug(reason);
+    SuspectRequest sr = new SuspectRequest(mbr, reason);
+    List<SuspectRequest> sl = new ArrayList<SuspectRequest>();
+    sl.add(sr);
+    sendSuspectRequest(sl);
+  }
+
+  /**
+   * This method sends check request to other member and waits for {@link #MEMBER_CHECK_TIMEOUT}
+   * time for response. If it doesn't see response then it returns false.
+   * @param pingMember
+   * @return
+   */
+  private boolean doCheckMember(InternalDistributedMember pingMember) {
+    //TODO: need to some tcp check
+    logger.debug("Checking the member: {}", pingMember);
+    final PingRequestMessage prm = constructPingRequestMessage(pingMember);
+    final Response pingResp = new Response();
+    requestIdVsResponse.put(prm.getRequestId(), pingResp);
+    try {
+      Set<InternalDistributedMember> membersNotReceivedMsg = this.services.getMessenger().send(prm);
+      // TODO: send is throwing exception
+      if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(pingMember)) {
+        // member is not part of current view.
+        logger.debug("Member {} is not part of current view.", pingMember);
+      } else {
+        synchronized (pingResp) {
+          if (pingResp.getResponseMsg() == null) {
+            pingResp.wait(MEMBER_CHECK_TIMEOUT);
+          }
+          if (pingResp.getResponseMsg() == null) {
+            return false;
+          } else {
+            return true;
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", pingMember);
+    } finally {
+      requestIdVsResponse.remove(prm.getRequestId());
+    }
+    return false;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.gemstone.gemfire.distributed.internal.membership.gms.fd.HealthMonitor#suspectMember(com.gemstone.gemfire.distributed.DistributedMember,
+   * java.lang.String)
+   */
+  @Override
+  public void suspect(InternalDistributedMember mbr, String reason) {
+    synchronized (suspectRequests) {
+      SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
+      if (!suspectRequests.contains(sr)) {
+        logger.debug("Adding member {} to suspect for reason {}.", mbr, reason);
+        suspectRequests.add(sr);
+        suspectRequests.notify();
+      }
+    }
+  }
+
+  @Override
+  public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean initiateRemoval) {
+    boolean pinged = doCheckMember((InternalDistributedMember) mbr);
+    if (!pinged && initiateRemoval) {
+      suspect((InternalDistributedMember) mbr, reason);
+    }
+    return pinged;
+  }
+
+  public void playDead(boolean b) {
+
+  }
+
+  public void start() {
+    {
+      scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+          Thread th = new Thread(Services.getThreadGroup(), r, "Member-Check Scheduler ");
+          th.setDaemon(true);
+          return th;
+        }
+      });
+    }
+    {
+      pingExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
+        AtomicInteger threadIdx = new AtomicInteger();
+
+        @Override
+        public Thread newThread(Runnable r) {
+          int id = threadIdx.getAndIncrement();
+          Thread th = new Thread(Services.getThreadGroup(), r, "Member-Check Thread " + id);
+          th.setDaemon(true);
+          return th;
+        }
+      });
+    }
+    {
+      Monitor m = this.new Monitor(memberTimeout);
+      long delay = memberTimeout / LOGICAL_INTERVAL;
+      monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, TimeUnit.MILLISECONDS);
+    }
+
+    {
+      suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Suspect message collector thread", Services.getThreadGroup(), suspectRequests,
+          new Callback<SuspectRequest>() {
+            @Override
+            public void process(List<SuspectRequest> requests) {
+              GMSHealthMonitor.this.sendSuspectRequest(requests);
+
+            }
+          }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
+      suspectRequestCollectorThread.setDaemon(true);
+      suspectRequestCollectorThread.start();
+    }
+  }
+
+  public synchronized void installView(NetView newView) {
+    synchronized (viewVsSuspectedMembers) {
+      viewVsSuspectedMembers.clear();
+    }
+    currentView = newView;
+    setNextNeighbour(currentView, null);
+  }
+
+  /***
+   * This method sets next neighbour which it needs to watch in current view.
+   * 
+   * if nextTo == null
+   * then it watches member next to it.
+   * 
+   * It becomes null when we suspect current neighbour, during that time it watches
+   * member next to suspect member.
+   */
+  private synchronized void setNextNeighbour(NetView newView, InternalDistributedMember nextTo) {
+    if (nextTo == null) {
+      nextTo = services.getJoinLeave().getMemberID();
+    }
+    boolean sameView = false;
+
+    if (newView.equals(currentView)) {
+      sameView = true;
+    }
+
+    List<InternalDistributedMember> allMembers = currentView.getMembers();
+    int index = allMembers.indexOf(nextTo);
+    if (index != -1) {
+      int nextNeighbourIndex = (index + 1) % allMembers.size();
+      nextNeighbour = allMembers.get(nextNeighbourIndex);
+      logger.debug("Next neighbour to check is {}", nextNeighbour);
+    }
+
+    if (!sameView || memberVsLastMsgTS.size() == 0) {
+      if (memberVsLastMsgTS.size() > 0) {
+        memberVsLastMsgTS.clear();
+      }
+
+      long cts = System.currentTimeMillis();
+      for (int i = 0; i < allMembers.size(); i++) {
+        CustomTimeStamp customTS = new CustomTimeStamp();
+        customTS.setTimeStamp(cts);
+        memberVsLastMsgTS.put(allMembers.get(i), customTS);
+      }
+    }
+  }
+
+  /*** test method */
+  public InternalDistributedMember getNextNeighbour() {
+    return nextNeighbour;
+  }
+
+  @Override
+  public void init(Services s) {
+    services = s;
+    memberTimeout = s.getConfig().getMemberTimeout();
+    services.getMessenger().addHandler(PingRequestMessage.class, this);
+    services.getMessenger().addHandler(PingResponseMessage.class, this);
+    services.getMessenger().addHandler(SuspectMembersMessage.class, this);
+  }
+
+  @Override
+  public void started() {
+
+  }
+
+  @Override
+  public void stop() {
+    stopServices();
+  }
+
+  private void stopServices() {
+    logger.debug("Stopping HealthMonitor");
+    isStopping = true;
+    {
+      monitorFuture.cancel(true);
+      scheduler.shutdown();
+    }
+    {
+      Collection<Response> val = requestIdVsResponse.values();
+      for (Iterator<Response> it = val.iterator(); it.hasNext();) {
+        Response r = it.next();
+        synchronized (r) {
+          r.notify();
+        }
+      }
+
+      pingExecutor.shutdown();
+    }
+    {
+      suspectRequestCollectorThread.shutdown();
+    }
+  }
+
+  /***
+   * test method
+   */
+  public boolean isShutdown() {
+    return scheduler.isShutdown() && pingExecutor.isShutdown() && !suspectRequestCollectorThread.isAlive();
+  }
+
+  @Override
+  public void stopped() {
+
+  }
+
+  @Override
+  public void beSick() {
+
+  }
+
+  @Override
+  public void playDead() {
+
+  }
+
+  @Override
+  public void beHealthy() {
+
+  }
+
+  @Override
+  public void emergencyClose() {
+    stopServices();
+  }
+
+  @Override
+  public void processMessage(DistributionMessage m) {
+    if (isStopping) {
+      return;
+    }
+
+    logger.debug("HealthMonitor processing {}", m);
+
+    switch (m.getDSFID()) {
+    case PING_REQUEST:
+      processPingRequest((PingRequestMessage) m);
+      break;
+    case PING_RESPONSE:
+      processPingResponse((PingResponseMessage) m);
+      break;
+    case SUSPECT_MEMBERS_MESSAGE:
+      processSuspectMembersRequest((SuspectMembersMessage) m);
+      break;
+    default:
+      throw new IllegalArgumentException("unknown message type: " + m);
+    }
+  }
+
+  private void processPingRequest(PingRequestMessage m) {
+    PingResponseMessage prm = new PingResponseMessage();
+    prm.setRecipient(m.getSender());
+    Set<InternalDistributedMember> membersNotReceivedMsg = services.getMessenger().send(prm);
+    // TODO: send is throwing exception right now
+    if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(m.getSender())) {
+      logger.debug("Unable to send check response to member: {}", m.getSender());
+    }
+  }
+
+  private void processPingResponse(PingResponseMessage m) {
+    Response pingResp = requestIdVsResponse.get(m.getRequestId());
+    logger.debug("Got check response from member {}. {}", m.getSender(), (pingResp != null ? "Check Thread still waiting" : "Check thread is not waiting"));
+    if (pingResp != null) {
+      synchronized (pingResp) {
+        pingResp.setResponseMsg(m);
+        pingResp.notify();
+      }
+    }
+  }
+
+  /**
+   * Process a Suspect request from another member. This may cause this member
+   * to become the new membership coordinator.
+   * it will to final check on that member and then it will send remove request
+   * for that member
+   * 
+   * @param incomingRequest
+   */
+  private void processSuspectMembersRequest(SuspectMembersMessage incomingRequest) {
+    NetView cv = currentView;
+    logger.debug("GMSHealthMonitor.processSuspectMembersRequest invoked for members {}", incomingRequest);
+
+    if (cv == null) {
+      return;
+    }
+
+    List<SuspectRequest> sMembers = incomingRequest.getMembers();
+
+    if (!cv.contains(incomingRequest.getSender())) {
+      logger.info("Membership ignoring suspect request for " + incomingRequest + " from non-member " + incomingRequest.getSender());
+      return;
+    }
+
+    InternalDistributedMember localAddress = services.getJoinLeave().getMemberID();
+
+    if (cv.getCoordinator().equals(localAddress)) {
+      doFinalCheck(sMembers, cv, localAddress);
+    }// coordinator ends
+    else {
+
+      NetView check = new NetView(cv, cv.getViewId() + 1);
+      ArrayList<SuspectRequest> smbr = new ArrayList<SuspectRequest>();
+      synchronized (viewVsSuspectedMembers) {
+        recordSuspectRequests(sMembers, cv);
+        Set<SuspectRequest> viewVsMembers = viewVsSuspectedMembers.get(cv);
+        Iterator<SuspectRequest> itr = viewVsMembers.iterator();
+        while (itr.hasNext()) {
+          SuspectRequest sr = itr.next();
+          check.remove(sr.getSuspectMember());
+          smbr.add(sr);
+        }
+      }
+
+      if (check.getCoordinator().equals(localAddress)) {
+        // new coordinator
+        doFinalCheck(smbr, cv, localAddress);
+      } else {
+        recordSuspectRequests(sMembers, cv);
+      }
+    }
+
+  }
+
+  /***
+   * This method make sure that records suspectRequest. We need to make sure this
+   * on preferred coordinators, as elder coordinator might be in suspected list next. 
+   * @param sMembers
+   * @param cv
+   */
+  private void recordSuspectRequests(List<SuspectRequest> sMembers, NetView cv) {
+    // record suspect requests
+    Set<SuspectRequest> viewVsMembers = null;
+    synchronized (viewVsSuspectedMembers) {
+      viewVsMembers = viewVsSuspectedMembers.get(cv);
+      if (viewVsMembers == null) {
+        viewVsMembers = new HashSet<SuspectRequest>();
+        viewVsSuspectedMembers.put(cv, viewVsMembers);
+      }
+      for (int i = 0; i < sMembers.size(); i++) {
+        SuspectRequest sr = sMembers.get(i);
+        if (!viewVsMembers.contains(sr)) {
+          viewVsMembers.add(sr);
+        }
+      }
+    }
+  }
+
+  private void doFinalCheck(List<SuspectRequest> sMembers, NetView cv, InternalDistributedMember localAddress) {
+    for (int i = 0; i < sMembers.size(); i++) {
+      final SuspectRequest sr = sMembers.get(i);
+      final InternalDistributedMember mbr = sr.getSuspectMember();
+
+      if (!cv.contains(mbr)) {
+        continue;
+      }
+
+      if (mbr.equals(localAddress)) {
+        continue;// self
+      }
+
+      NetView view;
+      view = suspectedMemberVsView.putIfAbsent(mbr, cv);
+
+      if (view == null || !view.equals(cv)) {
+        final String reason = sr.getReason();
+        logger.debug("Doing final check for member {}", mbr);
+        // its a coordinator
+        pingExecutor.execute(new Runnable() {
+
+          @Override
+          public void run() {
+            try {
+              logger.debug("Doing final check for member {}", mbr);
+              boolean pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+              if (!pinged) {
+                GMSHealthMonitor.this.services.getJoinLeave().remove(mbr, reason);
+              }
+            } catch (Exception e) {
+              logger.info("Unexpected exception while verifying member", e);
+            } finally {
+              GMSHealthMonitor.this.suspectedMemberVsView.remove(mbr);
+            }
+          }
+        });
+      }// scheduling for final check and removing it..
+    }
+  }
+
+  interface Callback<T> {
+    public void process(List<T> requests);
+  }
+
+  /***
+   * this thread will collect suspect message for some time interval
+   * then it send message to current coordinator first if its not in
+   * suspected list. if its in then it will send message to next probable
+   * coordinator. NOTE: this thread will not check-server for verification
+   * assuming many servers are going down and lets coordinator deals with it.
+   * 
+   * Should we wait for ack from coordinator/probable coordinator that I got
+   * request to suspect these members.
+   * 
+   */
+  class RequestCollector<T> extends Thread {
+    volatile boolean shutdown = false;
+    final List<T> listToTrack;
+    final Callback<T> callback;
+    final long timeout;
+
+    public RequestCollector(String name, ThreadGroup tg, List<T> l, Callback<T> c, long t) {
+      super(tg, name);
+      listToTrack = l;
+      callback = c;
+      timeout = t;
+    }
+
+    void shutdown() {
+      shutdown = true;
+      synchronized (listToTrack) {
+        listToTrack.notify();
+        interrupt();
+      }
+    }
+
+    boolean isShutdown() {
+      return shutdown;
+    }
+
+    @Override
+    public void run() {
+      List<T> requests = null;
+      logger.info("Suspect thread is starting");
+      long okayToSendSuspectRequest = System.currentTimeMillis() + timeout;
+      try {
+        for (;;) {
+          synchronized (listToTrack) {
+            if (shutdown || services.getCancelCriterion().isCancelInProgress()) {
+              return;
+            }
+            if (listToTrack.isEmpty()) {
+              try {
+                logger.debug("Result collector is waiting");
+                listToTrack.wait();
+              } catch (InterruptedException e) {
+                return;
+              }
+            } else {
+              long now = System.currentTimeMillis();
+              if (now < okayToSendSuspectRequest) {
+                // sleep to let more suspect requests arrive
+                try {
+                  sleep(okayToSendSuspectRequest - now);
+                  continue;
+                } catch (InterruptedException e) {
+                  return;
+                }
+              } else {
+                if (requests == null) {
+                  requests = new ArrayList<T>(listToTrack);
+                } else {
+                  requests.addAll(listToTrack);
+                }
+                listToTrack.clear();
+                okayToSendSuspectRequest = System.currentTimeMillis() + timeout;
+              }
+            }
+          } // synchronized
+          if (requests != null && !requests.isEmpty()) {
+            if (logger != null && logger.isDebugEnabled())
+              logger.debug("Health Monitor is sending {} member suspect requests to coordinator", requests.size());
+            callback.process(requests);
+            requests = null;
+          }
+        }
+      } finally {
+        shutdown = true;
+        logger.info("Suspect thread is stopped");
+      }
+    }
+  }
+
+  private void sendSuspectRequest(final List<SuspectRequest> requests) {
+    logger.debug("Sending suspect request for members {}", requests);
+    synchronized (suspectRequests) {
+      if (suspectRequests.size() > 0) {
+        for (int i = 0; i < suspectRequests.size(); i++) {
+          SuspectRequest sr = suspectRequests.get(0);
+          if (!requests.contains(sr)) {
+            requests.add(sr);
+          }
+        }
+        suspectRequests.clear();
+      }
+    }
+    HashSet<InternalDistributedMember> filter = new HashSet<InternalDistributedMember>();
+    for (int i = 0; i < requests.size(); i++) {
+      filter.add(requests.get(i).getSuspectMember());
+    }
+    List<InternalDistributedMember> recipients = currentView.getAllPreferredCoordinators(filter, services.getJoinLeave().getMemberID());
+
+    SuspectMembersMessage rmm = new SuspectMembersMessage(recipients, requests);
+    Set<InternalDistributedMember> failedRecipients = services.getMessenger().send(rmm);
+
+    if (failedRecipients != null && failedRecipients.size() > 0) {
+      logger.info("Unable to send suspect message to {}", recipients);
+    }
+  }
+
+  @Override
+  public void memberShutdown(DistributedMember mbr, String reason) {
+    // TODO Auto-generated method stub
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 9450091..6e7abbd 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -864,7 +864,10 @@ logger.info("received join response {}", response);
     NetView v = this.currentView;
     
     if (v != null) {
-      RemoveMemberMessage msg = new RemoveMemberMessage(v.getCoordinator(), m,
+      Set<InternalDistributedMember> filter = new HashSet<>();
+      filter.add(m);
+      RemoveMemberMessage msg = new RemoveMemberMessage(v.getAllPreferredCoordinators(filter, getMemberID()), 
+          m,
           reason);
       services.getMessenger().send(msg);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/PingRequestMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/PingRequestMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/PingRequestMessage.java
new file mode 100755
index 0000000..95c86b6
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/PingRequestMessage.java
@@ -0,0 +1,56 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Version;
+
+public class PingRequestMessage extends HighPriorityDistributionMessage{
+
+  int requestId;
+  
+  public PingRequestMessage(int id) {
+    requestId = id;
+  }
+  
+  public PingRequestMessage(){}
+  
+  @Override
+  public int getDSFID() {
+    return PING_REQUEST;
+  }
+
+  @Override
+  protected void process(DistributionManager dm) {
+    throw new IllegalStateException("this message is not intended to execute in a thread pool");
+  }   
+
+  @Override
+  public String toString() {
+    return "PingRequestMessage [requestId=" + requestId + "]";
+  }
+
+  public int getRequestId() {
+    return requestId;
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }  
+  
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(requestId);
+  }
+  
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    requestId = in.readInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/PingResponseMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/PingResponseMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/PingResponseMessage.java
new file mode 100755
index 0000000..86e154c
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/PingResponseMessage.java
@@ -0,0 +1,54 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
+import com.gemstone.gemfire.internal.Version;
+
+public class PingResponseMessage extends HighPriorityDistributionMessage {
+  int requestId;
+  
+  public PingResponseMessage(int id) {
+    requestId = id;
+  }
+
+  public PingResponseMessage(){}
+  
+  public int getRequestId() {
+    return requestId;
+  }
+
+
+  @Override
+  public int getDSFID() {
+    return PING_RESPONSE;
+  }
+
+  @Override
+  protected void process(DistributionManager dm) {
+    throw new IllegalStateException("this message is not intended to execute in a thread pool");
+  }
+ 
+  @Override
+  public String toString() {
+    return "PingResponseMessage [requestId=" + requestId + "]";
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }  
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(requestId);
+  }
+  
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    requestId = in.readInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/RemoveMemberMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/RemoveMemberMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/RemoveMemberMessage.java
index 666b41c..30137a2 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/RemoveMemberMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/RemoveMemberMessage.java
@@ -3,6 +3,8 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
@@ -22,6 +24,13 @@ public class RemoveMemberMessage extends HighPriorityDistributionMessage {
     this.reason = reason;
   }
   
+  public RemoveMemberMessage(List<InternalDistributedMember> recipients, InternalDistributedMember id, String reason) {
+    super();
+    setRecipients(recipients);
+    this.memberID = id;
+    this.reason = reason;
+  }
+  
   public RemoveMemberMessage() {
     // no-arg constructor for serialization
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectMembersMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectMembersMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectMembersMessage.java
new file mode 100755
index 0000000..1c69b47
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectMembersMessage.java
@@ -0,0 +1,76 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Version;
+
+public class SuspectMembersMessage extends HighPriorityDistributionMessage {
+  final List<SuspectRequest> suspectRequests;
+
+  public SuspectMembersMessage(List<InternalDistributedMember> recipient, List<SuspectRequest> s) {
+    super();
+    setRecipients(recipient);
+    this.suspectRequests = s;
+  }
+
+  public SuspectMembersMessage() {
+    // no-arg constructor for serialization
+    suspectRequests = new ArrayList<SuspectRequest>();
+  }
+
+  @Override
+  public int getDSFID() {
+    return SUSPECT_MEMBERS_MESSAGE;
+  }
+
+  @Override
+  public void process(DistributionManager dm) {
+    throw new IllegalStateException("this message is not intended to execute in a thread pool");
+  }
+
+  public List<SuspectRequest> getMembers() {
+    return suspectRequests;
+  }
+
+  @Override
+  public String toString() {
+    return "SuspectMembersMessage [suspectRequests=" + suspectRequests + "]";
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return null;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    if (suspectRequests != null) {
+      out.writeInt(suspectRequests.size());
+      for (int i = 0; i < suspectRequests.size(); i++) {
+        SuspectRequest sr = suspectRequests.get(0);
+        DataSerializer.writeObject(sr.getSuspectMember(), out);
+        DataSerializer.writeString(sr.getReason(), out);
+      }
+    } else {
+      out.writeInt(0);
+    }
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    int size = in.readInt();
+    for (int i = 0; i < size; i++) {
+      SuspectRequest sr = new SuspectRequest((InternalDistributedMember) DataSerializer.readObject(in), DataSerializer.readString(in));
+      suspectRequests.add(sr);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectRequest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectRequest.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectRequest.java
new file mode 100755
index 0000000..a553d39
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/SuspectRequest.java
@@ -0,0 +1,56 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.messages;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+
+public class SuspectRequest {
+  final InternalDistributedMember suspectMember;
+  final String reason;
+
+  public SuspectRequest(InternalDistributedMember m, String r) {
+    suspectMember = m;
+    reason = r;
+  }
+
+  public InternalDistributedMember getSuspectMember() {
+    return suspectMember;
+  }
+
+  public String getReason() {
+    return reason;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((suspectMember == null) ? 0 : suspectMember.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    SuspectRequest other = (SuspectRequest) obj;
+    if (suspectMember == null) {
+      if (other.suspectMember != null) {
+        return false;
+      }
+    } else if (!suspectMember.equals(other.suspectMember)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "SuspectRequest [suspectMemebr=" + suspectMember + ", reason=" + reason + "]";
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
index f94c29c..663f495 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
@@ -99,7 +99,10 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.messages.Install
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.PingRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.PingResponseMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
 import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
 import com.gemstone.gemfire.distributed.internal.streaming.StreamingOperation.StreamingReplyMessage;
 import com.gemstone.gemfire.internal.admin.ClientMembershipMessage;
@@ -471,6 +474,9 @@ public final class DSFIDFactory implements DataSerializableFixedID {
 
   private static void registerDSFIDTypes() {
     registerDSFID(REMOVE_MEMBER_REQUEST, RemoveMemberMessage.class);
+    registerDSFID(PING_REQUEST, PingRequestMessage.class);
+    registerDSFID(PING_RESPONSE, PingResponseMessage.class);
+    registerDSFID(SUSPECT_MEMBERS_MESSAGE, SuspectMembersMessage.class);
     registerDSFID(LEAVE_REQUEST_MESSAGE, LeaveRequestMessage.class);
     registerDSFID(VIEW_ACK_MESSAGE, ViewAckMessage.class);
     registerDSFID(INSTALL_VIEW_MESSAGE, InstallViewMessage.class);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
index c512479..12141e0 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
@@ -74,7 +74,13 @@ public interface DataSerializableFixedID extends SerializationVersions {
       return new FOO(in);
   */
   
+  
+  public static final short SUSPECT_MEMBERS_MESSAGE = -156;
+  
+  public static final short PING_RESPONSE = -155;
+  public static final short PING_REQUEST = -154;
   public static final short REMOVE_MEMBER_REQUEST = -153;
+  
   public static final short LEAVE_REQUEST_MESSAGE = -152;
   
   public static final short VIEW_ACK_MESSAGE = -151;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8a9294a3/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java
new file mode 100644
index 0000000..3d05b89
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSHealthMonitorJUnitTest.java
@@ -0,0 +1,439 @@
+package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.PingRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.PingResponseMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectRequest;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class GMSHealthMonitorJUnitTest {
+
+  private Services services;
+  private ServiceConfig mockConfig;
+  private DistributionConfig mockDistConfig;
+  private List<InternalDistributedMember> mockMembers;
+  private Messenger messenger;
+  private GMSJoinLeave joinLeave;
+  private GMSHealthMonitor gmsHealthMonitor;
+  final long memberTimeout = 1000l;
+
+  @Before
+  public void initMocks() throws UnknownHostException {
+    mockDistConfig = mock(DistributionConfig.class);
+    mockConfig = mock(ServiceConfig.class);
+    messenger = mock(Messenger.class);
+    joinLeave = mock(GMSJoinLeave.class);
+    services = mock(Services.class);
+
+    when(mockConfig.getDistributionConfig()).thenReturn(mockDistConfig);
+    when(mockConfig.getMemberTimeout()).thenReturn(memberTimeout);
+    when(services.getConfig()).thenReturn(mockConfig);
+    when(services.getMessenger()).thenReturn(messenger);
+    when(services.getJoinLeave()).thenReturn(joinLeave);   
+
+    mockMembers = new ArrayList<InternalDistributedMember>();
+    for (int i = 0; i < 7; i++) {
+      InternalDistributedMember mbr = new InternalDistributedMember("localhost", 8888 + i);
+
+      if (i == 0 || i == 1) {
+        mbr.setVmKind(DistributionManager.LOCATOR_DM_TYPE);
+        mbr.getNetMember().setPreferredForCoordinator(true);
+      }
+      mockMembers.add(mbr);
+    }
+
+    gmsHealthMonitor = new GMSHealthMonitor();
+    gmsHealthMonitor.init(services);
+    gmsHealthMonitor.start();
+  }
+
+  @After
+  public void tearDown() {
+    gmsHealthMonitor.stop();
+  }
+
+  @Test
+  public void testHMServiceStarted() throws IOException {
+
+    MethodExecuted messageSent = new MethodExecuted();
+    when(messenger.send(any(PingResponseMessage.class))).thenAnswer(messageSent);
+
+    gmsHealthMonitor.processMessage(new PingRequestMessage(1));
+    Assert.assertTrue("Ping Response should have been sent", messageSent.isMethodExecuted());
+  }
+
+  /**
+   * checks whether we get local member id or not to set next neighbour
+   */
+  @Test
+  public void testHMNextNeighbour() throws IOException {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    MethodExecuted messageSent = new MethodExecuted();
+    when(services.getJoinLeave().getMemberID()).thenAnswer(messageSent);
+
+    gmsHealthMonitor.installView(v);
+
+    Assert.assertTrue("It should have got memberID from services.getJoinLeave().getMemberID()", messageSent.isMethodExecuted());
+  }
+
+  /**
+   * checks who is next neighbour
+   */
+  @Test
+  public void testHMNextNeighbourVerify() throws IOException {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
+
+    gmsHealthMonitor.installView(v);
+
+    Assert.assertEquals(mockMembers.get(4), gmsHealthMonitor.getNextNeighbour());
+
+  }
+
+  /**
+   * it checks neighbour after membertimeout, it should be different
+   */
+
+  @Test
+  public void testHMNextNeighbourAfterTimeout() throws IOException {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    // 3rd is current member
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
+
+    gmsHealthMonitor.installView(v);
+
+    try {
+      // member-timeout is 1000 ms
+      Thread.sleep(memberTimeout + 5);
+    } catch (InterruptedException e) {
+    }
+    // neighbour should change to 5th
+    Assert.assertEquals(mockMembers.get(5), gmsHealthMonitor.getNextNeighbour());
+  }
+
+  /**
+   * it checks neighbour before membertiemout, it should be same
+   */
+
+  @Test
+  public void testHMNextNeighbourBeforeTimeout() throws IOException {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    // 3rd is current member
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
+
+    gmsHealthMonitor.installView(v);
+
+    try {
+      // member-timeout is 1000 ms, so next neighbour should be same
+      Thread.sleep(memberTimeout - 200);
+    } catch (InterruptedException e) {
+    }
+    // neighbour should be same
+    Assert.assertEquals(mockMembers.get(4), gmsHealthMonitor.getNextNeighbour());
+  }
+
+  /***
+   * checks whether member-check thread sends suspectMembers message
+   */
+  @Test
+  public void testSuspectMembersCalledThroughMemberCheckThread() {
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    MethodExecuted messageSent = new MethodExecuted();
+    // 3rd is current member
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
+
+    gmsHealthMonitor.installView(v);
+
+    when(messenger.send(any(SuspectMembersMessage.class))).thenAnswer(messageSent);
+
+    try {
+      // member-timeout is 1000 ms + ping timeout 100ms
+      // plus wait 100 ms for ack
+      Thread.sleep(memberTimeout + 100);
+    } catch (InterruptedException e) {
+    }
+
+    Assert.assertTrue("SuspectMembersMessage should have sent", messageSent.isMethodExecuted());
+  }
+
+  /***
+   * checks ping thread didn't sends suspectMembers message before timeout
+   */
+  @Test
+  public void testSuspectMembersNotCalledThroughPingThreadBeforeTimeout() {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    MethodExecuted messageSent = new MethodExecuted();
+    // 3rd is current member
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
+
+    gmsHealthMonitor.installView(v);
+
+    when(messenger.send(any(SuspectMembersMessage.class))).thenAnswer(messageSent);
+
+    try {
+      // member-timeout is 1000 ms
+      // plus 100 ms for ack
+      Thread.sleep(memberTimeout - 200);
+    } catch (InterruptedException e) {
+    }
+
+    Assert.assertTrue("SuspectMembersMessage shouldn't have sent", !messageSent.isMethodExecuted());
+  }
+
+  /***
+   * Checks whether suspect thread sends suspectMembers message
+   */
+  @Test
+  public void testSuspectMembersCalledThroughSuspectThread() {
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+    
+    MethodExecuted messageSent = new MethodExecuted();
+    // 3rd is current member
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
+    
+    gmsHealthMonitor.installView(v);
+
+    gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding");
+
+    when(messenger.send(any(SuspectMembersMessage.class))).thenAnswer(messageSent);
+
+    try {
+      // suspect thread timeout is 200 ms
+      Thread.sleep(300l);
+    } catch (InterruptedException e) {
+    }
+    Assert.assertTrue("SuspectMembersMessage should have sent", messageSent.isMethodExecuted());
+  }
+
+  /***
+   * Checks suspect thread doesn't sends suspectMembers message before timeout
+   */
+  @Test
+  public void testSuspectMembersNotCalledThroughSuspectThreadBeforeTimeout() {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    MethodExecuted messageSent = new MethodExecuted();
+    // 3rd is current member
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
+
+    gmsHealthMonitor.installView(v);
+
+    gmsHealthMonitor.suspect(mockMembers.get(1), "Not responding");
+
+    when(messenger.send(any(SuspectMembersMessage.class))).thenAnswer(messageSent);
+
+    try {
+      // suspect thread timeout is 200 ms
+      Thread.sleep(100l);
+    } catch (InterruptedException e) {
+    }
+
+    Assert.assertTrue("SuspectMembersMessage shouldn't have sent", !messageSent.isMethodExecuted());
+  }
+
+  /***
+   * Send remove member message after doing final check, ping Timeout
+   */
+  @Test
+  public void testRemoveMemberCalled() {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    MethodExecuted messageSent = new MethodExecuted();
+    // 3rd is current member
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member
+
+    gmsHealthMonitor.installView(v);
+
+    ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>();
+    recipient.add(mockMembers.get(0));
+    ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>();
+    SuspectRequest sr = new SuspectRequest(mockMembers.get(1), "Not Responding");// removing member 1
+    as.add(sr);
+    SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as);
+    sm.setSender(mockMembers.get(0));
+
+    when(messenger.send(any(RemoveMemberMessage.class))).thenAnswer(messageSent);
+
+    gmsHealthMonitor.processMessage(sm);
+
+    try {
+      // this happens after final check, ping timeout
+      Thread.sleep(150l);
+    } catch (InterruptedException e) {
+    }
+
+    Assert.assertTrue("RemoveMemberMessage should have sent", messageSent.isMethodExecuted());
+  }
+
+  /***
+   * Shouldn't send remove member message before doing final check, or before ping Timeout
+   */
+  @Test
+  public void testRemoveMemberNotCalledBeforeTimeout() {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    MethodExecuted messageSent = new MethodExecuted();
+    // 3rd is current member
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(0)); // coordinator and local member
+
+    gmsHealthMonitor.installView(v);
+
+    ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>();
+    recipient.add(mockMembers.get(0));
+    ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>();
+    SuspectRequest sr = new SuspectRequest(mockMembers.get(1), "Not Responding");// removing member 1
+    as.add(sr);
+    SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as);
+    sm.setSender(mockMembers.get(0));
+
+    when(messenger.send(any(RemoveMemberMessage.class))).thenAnswer(messageSent);
+
+    gmsHealthMonitor.processMessage(sm);
+
+    try {
+      // this happens after final check, ping timeout
+      Thread.sleep(90l);
+    } catch (InterruptedException e) {
+    }
+
+    Assert.assertTrue("RemoveMemberMessage should have sent", messageSent.isMethodExecuted());
+  }
+
+  /***
+   * Send remove member message after doing final check for coordinator, ping timeout
+   * This test trying to remove coordinator
+   */
+  @Test
+  public void testRemoveMemberCalledAfterDoingFinalCheckOnCoordinator() {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    MethodExecuted messageSent = new MethodExecuted();
+    // preferred coordinators are 0 and 1
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(1));// next preferred coordinator
+
+    gmsHealthMonitor.installView(v);
+
+    ArrayList<InternalDistributedMember> recipient = new ArrayList<InternalDistributedMember>();
+    recipient.add(mockMembers.get(0));
+    recipient.add(mockMembers.get(1));
+    ArrayList<SuspectRequest> as = new ArrayList<SuspectRequest>();
+    SuspectRequest sr = new SuspectRequest(mockMembers.get(0), "Not Responding");// removing coordinator
+    as.add(sr);
+    SuspectMembersMessage sm = new SuspectMembersMessage(recipient, as);
+    sm.setSender(mockMembers.get(4));// member 4 sends suspect message
+
+    when(messenger.send(any(RemoveMemberMessage.class))).thenAnswer(messageSent);// member 1 will process
+
+    gmsHealthMonitor.processMessage(sm);
+
+    try {
+      // this happens after final check, ping timeout = 100 ms
+      Thread.sleep(110l);
+    } catch (InterruptedException e) {
+    }
+
+    Assert.assertTrue("RemoveMemberMessage should have sent.", messageSent.isMethodExecuted());
+  }
+
+  /***
+   * validates HealthMonitor.CheckIfAvailable api
+   */
+  @Test
+  public void testCheckIfAvailable() {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    // 3rd is current member
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
+
+    gmsHealthMonitor.installView(v);
+
+    long startTime = System.currentTimeMillis();
+
+    boolean retVal = gmsHealthMonitor.checkIfAvailable(mockMembers.get(1), "Not responding", false);
+
+    long timeTaken = System.currentTimeMillis() - startTime;
+
+    Assert.assertTrue("This should have taken member ping timeout 100ms ", timeTaken > 90);
+    Assert.assertTrue("CheckIfAvailable should have return false", !retVal);
+  }
+
+  @Test
+  public void testShutdown() {
+
+    NetView v = new NetView(mockMembers.get(0), 2, mockMembers, new ArrayList<InternalDistributedMember>(), new ArrayList<InternalDistributedMember>());
+
+    MethodExecuted messageSent = new MethodExecuted();
+    // 3rd is current member
+    when(joinLeave.getMemberID()).thenReturn(mockMembers.get(3));
+
+    gmsHealthMonitor.installView(v);
+
+    gmsHealthMonitor.stop();
+
+    try {
+      // this happens after final check, membertimeout = 1000
+      Thread.sleep(100l);
+    } catch (InterruptedException e) {
+    }
+
+    Assert.assertTrue("HeathMonitor should have shutdown", gmsHealthMonitor.isShutdown());
+
+  }
+
+  private class MethodExecuted implements Answer {
+    private boolean methodExecuted = false;
+
+    public boolean isMethodExecuted() {
+      return methodExecuted;
+    }
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      methodExecuted = true;
+      return null;
+    }
+  }
+}


Mime
View raw message