geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [2/2] incubator-geode git commit: Fixing problems with network partition detection
Date Fri, 12 Feb 2016 16:10:22 GMT
Fixing problems with network partition detection

Recent changes in GMSJoinLeave and GMSHealthMonitor destabilized network
partition detection to the point that it was barely working.  This corrects
faults that were causing the failures - exponential barrages of suspect
processing and failure to detect loss of members during membership view
creation.

GMSJoinLeave's removeHealthyMembers and filterMembers methods had some problems
that resulted in unhealthy members not being detected.  GMSHealthMonitor
did not have barriers in place to prevent initiation of concurrent suspect
processing on the same member.  JGroupsMessenger was initiating final checks
on members if unable to send a message to them instead of merely raising
suspicion.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/db654a78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/db654a78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/db654a78

Branch: refs/heads/develop
Commit: db654a7897f70d7f4c72c22f22c19e6c565e7990
Parents: 6f0a329
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
Authored: Fri Feb 12 07:55:56 2016 -0800
Committer: Bruce Schuchardt <bschuchardt@pivotal.io>
Committed: Fri Feb 12 08:09:19 2016 -0800

----------------------------------------------------------------------
 .../internal/membership/NetView.java            |   3 +-
 .../membership/gms/fd/GMSHealthMonitor.java     | 217 ++++++++++---------
 .../membership/gms/interfaces/JoinLeave.java    |   6 +
 .../membership/gms/membership/GMSJoinLeave.java | 122 +++++++----
 .../gms/messenger/JGroupsMessenger.java         |   4 +-
 .../gms/membership/GMSJoinLeaveJUnitTest.java   |  16 ++
 .../messenger/JGroupsMessengerJUnitTest.java    |   4 +-
 7 files changed, 229 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db654a78/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
index b0ddcc0..40f5f71 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.internal.DataSerializableFixedID;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
@@ -261,7 +262,7 @@ public class NetView implements DataSerializableFixedID {
     }
   }
 
-  public boolean contains(InternalDistributedMember mbr) {
+  public boolean contains(DistributedMember mbr) {
     return this.hashedMembers.contains(mbr);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db654a78/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index b20fe03..beb781d 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -141,10 +141,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
    */
   final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView
= new ConcurrentHashMap<>();
   
-//  /**
-//   * Members undergoing final checks
-//   */
-//  final private List<InternalDistributedMember> membersInFinalCheck = Collections.synchronizedList(new
ArrayList<>(30));
+  /**
+   * Members undergoing final checks
+   */
+  final private List<InternalDistributedMember> membersInFinalCheck = Collections.synchronizedList(new
ArrayList<>(30));
 
   /**
    * Replies to messages
@@ -334,7 +334,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
           }
         }
       } catch (IOException e) {
-        logger.debug("Unexpected exception", e);
+        // this is expected if it is a connection-timeout or other failure
+        // to connect
       } catch (RuntimeException e) {
         logger.debug("Unexpected runtime exception", e);
         throw e;
@@ -346,7 +347,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
           try {
             socket.close();
           } catch (IOException e) {
-            logger.info("Unexpected exception", e);
+            // expected if the socket is already closed
           }
         }
       }
@@ -361,14 +362,22 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   }
 
   /*
-   * It records the member activity for current time interval.
+   * Record the member activity for current time interval.
    */
   @Override
   public void contactedBy(InternalDistributedMember sender) {
-    TimeStamp cTS = new TimeStamp(currentTimeStamp);
+    contactedBy(sender, currentTimeStamp);
+  }
+  
+  
+  /**
+   * Record member activity at a specified time
+   */
+  private void contactedBy(InternalDistributedMember sender, long timeStamp) {
+    TimeStamp cTS = new TimeStamp(timeStamp);
     cTS = memberTimeStamps.putIfAbsent(sender, cTS);
     if (cTS != null) {
-      cTS.setTimeStamp(currentTimeStamp);
+      cTS.setTimeStamp(timeStamp);
     }
     if (suspectedMemberInView.remove(sender) != null) {
       logger.info("No longer suspecting {}", sender);
@@ -376,6 +385,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
     setNextNeighbor(currentView, null);
   }
 
+  
   private HeartbeatRequestMessage constructHeartbeatRequestMessage(final InternalDistributedMember
mbr) {
     final int reqId = requestId.getAndIncrement();
     final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, reqId);
@@ -413,6 +423,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   }
 
   private void initiateSuspicion(InternalDistributedMember mbr, String reason) {
+    if (services.getJoinLeave().isMemberLeaving(mbr)) {
+      return;
+    }
     SuspectRequest sr = new SuspectRequest(mbr, reason);
     List<SuspectRequest> sl = new ArrayList<SuspectRequest>();
     sl.add(sr);
@@ -432,6 +445,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       // messages by returning true.
       return true;
     }
+    long startTime = System.currentTimeMillis();
     logger.trace("Checking member {}", member);
     final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
     final Response pingResp = new Response();
@@ -448,6 +462,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
             pingResp.wait(memberTimeout);
           }
           TimeStamp ts = memberTimeStamps.get(member);
+          if (ts != null && ts.getTime() > startTime) {
+            return true;
+          }
           if (pingResp.getResponseMsg() == null) {
             if (isStopping) {
               return true;
@@ -489,7 +506,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       return doTCPCheckMember(suspectMember, clientSocket);
     }
     catch (IOException e) {
-      logger.debug("Unexpected exception", e);
+      // this is expected if it is a connection-timeout or other failure
+      // to connect
     } 
     finally {
       try {
@@ -498,7 +516,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
           clientSocket.close();
         }
       } catch (IOException e) {
-        logger.trace("Unexpected exception", e);
+        // expected
       }
     }
     return false;
@@ -518,9 +536,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
         this.stats.incTcpFinalCheckRequestsSent();
         logger.debug("Connected - reading response from suspect member {}", suspectMember);
         int b = in.read();
-        this.stats.incFinalCheckResponsesReceived();
-        this.stats.incTcpFinalCheckResponsesReceived();
         logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" : b)), suspectMember);
+        if (b >= 0) {
+          this.stats.incFinalCheckResponsesReceived();
+          this.stats.incTcpFinalCheckResponsesReceived();
+        }
         if (b == OK) {
           TimeStamp ts = memberTimeStamps.get(suspectMember);
           if (ts != null) {
@@ -1030,7 +1050,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
     this.stats.incHeartbeatsReceived();
     if (m.getRequestId() < 0) {
       // a periodic heartbeat
-      contactedBy(m.getSender());
+      contactedBy(m.getSender(), System.currentTimeMillis());
     } else {
       Response resp = requestIdVsResponse.get(m.getRequestId());
       logger.trace("Got heartbeat from member {}. {}", m.getSender(), (resp != null ? "Check
thread still waiting" : "Check thread is not waiting"));
@@ -1162,56 +1182,49 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   private void checkIfAvailable(final InternalDistributedMember initiator,
       List<SuspectRequest> sMembers, final NetView cv) {
 
-//    List<InternalDistributedMember> membersChecked = new ArrayList<>(10);
-    try {
-      for (int i = 0; i < sMembers.size(); i++) {
-        final SuspectRequest sr = sMembers.get(i);
-        final InternalDistributedMember mbr = sr.getSuspectMember();
+    for (int i = 0; i < sMembers.size(); i++) {
+      final SuspectRequest sr = sMembers.get(i);
+      final InternalDistributedMember mbr = sr.getSuspectMember();
 
-        if (!cv.contains(mbr) /*|| membersInFinalCheck.contains(mbr)*/) {
-          continue;
-        }
+      if (!cv.contains(mbr) || membersInFinalCheck.contains(mbr)) {
+        continue;
+      }
 
-        if (mbr.equals(localAddress)) {
-          continue;// self
-        }
-        
-//        membersChecked.add(mbr);
-
-        // suspectMemberInView is now set by the heartbeat monitoring code
-        // to allow us to move on from watching members we've already
-        // suspected.  Since that code is updating this collection we
-        // cannot use it here as an indication that a member is currently
-        // undergoing a final check.
-        //      NetView view;
-        //      view = suspectedMemberInView.putIfAbsent(mbr, cv);
-
-        //      if (view == null || !view.equals(cv)) {
-        final String reason = sr.getReason();
-        logger.debug("Scheduling final check for member {}; reason={}", mbr, reason);
-        // its a coordinator
-        checkExecutor.execute(new Runnable() {
-
-          @Override
-          public void run() {
-            try {
-              inlineCheckIfAvailable(initiator, cv, true, mbr,
-                  reason);
-            } catch (DistributedSystemDisconnectedException e) {
-              return;
-            } catch (Exception e) {
-              logger.info("Unexpected exception while verifying member", e);
-            } finally {
-              GMSHealthMonitor.this.suspectedMemberInView.remove(mbr);
-            }
+      if (mbr.equals(localAddress)) {
+        continue;// self
+      }
+
+      // suspectMemberInView is now set by the heartbeat monitoring code
+      // to allow us to move on from watching members we've already
+      // suspected.  Since that code is updating this collection we
+      // cannot use it here as an indication that a member is currently
+      // undergoing a final check.
+      //      NetView view;
+      //      view = suspectedMemberInView.putIfAbsent(mbr, cv);
+
+      //      if (view == null || !view.equals(cv)) {
+      final String reason = sr.getReason();
+      logger.debug("Scheduling final check for member {}; reason={}", mbr, reason);
+      // its a coordinator
+      checkExecutor.execute(new Runnable() {
+
+        @Override
+        public void run() {
+          try {
+            inlineCheckIfAvailable(initiator, cv, true, mbr,
+                reason);
+          } catch (DistributedSystemDisconnectedException e) {
+            return;
+          } catch (Exception e) {
+            logger.info("Unexpected exception while verifying member", e);
+          } finally {
+            GMSHealthMonitor.this.suspectedMemberInView.remove(mbr);
           }
+        }
 
-          
-        });
-        //      }// scheduling for final check and removing it..
-      }
-    } finally {
-//      membersInFinalCheck.removeAll(membersChecked);
+
+      });
+      //      }// scheduling for final check and removing it..
     }
   }
 
@@ -1220,50 +1233,60 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       boolean initiateRemoval,
       final InternalDistributedMember mbr, final String reason) {
 
-    services.memberSuspected(initiator, mbr, reason);
-    long startTime = System.currentTimeMillis();
-    // for some reason we used to update the timestamp for the member
-    // with the startTime, but we don't want to do that because it looks
-    // like a heartbeat has been received
-
-    logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
-    boolean pinged;
-    int port = cv.getFailureDetectionPort(mbr);
-    if (port <= 0) {
-      logger.info("Unable to locate failure detection port - requesting a heartbeat");
-      if (logger.isDebugEnabled()) {
-        logger.debug("\ncurrent view: {}\nports: {}", cv, Arrays.toString(cv.getFailureDetectionPorts()));
-      }
-      pinged = GMSHealthMonitor.this.doCheckMember(mbr);
-      GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
-      GMSHealthMonitor.this.stats.incUdpFinalCheckRequestsSent();
-      if (pinged) {
-        GMSHealthMonitor.this.stats.incFinalCheckResponsesReceived();
-        GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
-      }
-    } else {
-      pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
+    if (services.getJoinLeave().isMemberLeaving(mbr)) {
+      return false;
     }
 
     boolean failed = false;
-    if (!pinged && !isStopping) {
-      TimeStamp ts = memberTimeStamps.get(mbr);
-      if (ts == null || ts.getTime() <= startTime) {
-        logger.info("Final check failed - requesting removal of suspect member " + mbr);
-        if (initiateRemoval) {
-          services.getJoinLeave().remove(mbr, reason);
+
+    membersInFinalCheck.add(mbr);
+    try {
+      services.memberSuspected(initiator, mbr, reason);
+      long startTime = System.currentTimeMillis();
+      // for some reason we used to update the timestamp for the member
+      // with the startTime, but we don't want to do that because it looks
+      // like a heartbeat has been received
+  
+      logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
+      boolean pinged;
+      int port = cv.getFailureDetectionPort(mbr);
+      if (port <= 0) {
+        logger.info("Unable to locate failure detection port - requesting a heartbeat");
+        if (logger.isDebugEnabled()) {
+          logger.debug("\ncurrent view: {}\nports: {}", cv, Arrays.toString(cv.getFailureDetectionPorts()));
+        }
+        pinged = GMSHealthMonitor.this.doCheckMember(mbr);
+        GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
+        GMSHealthMonitor.this.stats.incUdpFinalCheckRequestsSent();
+        if (pinged) {
+          GMSHealthMonitor.this.stats.incFinalCheckResponsesReceived();
+          GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
         }
-        failed = true;
       } else {
-        logger.info("Final check failed but detected recent message traffic for suspect member
" + mbr);
+        pinged = GMSHealthMonitor.this.doTCPCheckMember(mbr, port);
       }
+  
+      if (!pinged && !isStopping) {
+        TimeStamp ts = memberTimeStamps.get(mbr);
+        if (ts == null || ts.getTime() <= startTime) {
+          logger.info("Final check failed - requesting removal of suspect member " + mbr);
+          if (initiateRemoval) {
+            services.getJoinLeave().remove(mbr, reason);
+          }
+          failed = true;
+        } else {
+          logger.info("Final check failed but detected recent message traffic for suspect
member " + mbr);
+        }
+      }
+      if (!failed) {
+        logger.info("Final check passed for suspect member " + mbr);
+      }
+      // whether it's alive or not, at this point we allow it to
+      // be watched again
+      suspectedMemberInView.remove(mbr);
+    } finally {
+      membersInFinalCheck.remove(mbr);
     }
-    if (!failed) {
-      logger.info("Final check passed for suspect member " + mbr);
-    }
-    // whether it's alive or not, at this point we allow it to
-    // be watched again
-    suspectedMemberInView.remove(mbr);
     return !failed;
   }
     

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db654a78/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java
index 3f2d847..87409c5 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/JoinLeave.java
@@ -61,6 +61,12 @@ public interface JoinLeave extends Service {
   NetView getPreviousView();
   
   /**
+   * check to see if a member is already in the process of leaving or
+   * being removed (in the next view)
+   */
+  boolean isMemberLeaving(DistributedMember mbr);
+  
+  /**
    * test hook
    */
   void disableDisconnectOnQuorumLossForTesting();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db654a78/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index 0f16ba9..5d34041 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -132,6 +132,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   /** the previous view **/
   private volatile NetView previousView;
 
+  /** members who we have been declared dead in the current view */
   private final Set<InternalDistributedMember> removedMembers = new HashSet<>();
 
   /** members who we've received a leave message from **/
@@ -390,6 +391,26 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
     return response;
   }
+  
+  @Override
+  public boolean isMemberLeaving(DistributedMember mbr) {
+    if (getPendingRequestIDs(LEAVE_REQUEST_MESSAGE).contains(mbr)
+        || getPendingRequestIDs(REMOVE_MEMBER_REQUEST).contains(mbr)
+        || !currentView.contains(mbr)) {
+      return true;
+    }
+    synchronized(removedMembers) {
+      if (removedMembers.contains(mbr)) {
+        return true;
+      }
+    }
+    synchronized(leftMembers) {
+      if (leftMembers.contains(mbr)) {
+        return true;
+      }
+    }
+    return false;
+  }
 
   /**
    * process a join request from another member. If this is the coordinator
@@ -465,8 +486,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
 
     if (incomingRequest.getMemberID().equals(this.localAddress)) {
-      logger.info("I am being told to leave the distributed system");
+      logger.info("I am being told to leave the distributed system by {}", incomingRequest.getSender());
       forceDisconnect(incomingRequest.getReason());
+      return;
     }
 
     if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress())
{
@@ -504,6 +526,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
    */
   private void processRemoveRequest(RemoveMemberMessage incomingRequest) {
     NetView v = currentView;
+    boolean fromMe = incomingRequest.getSender() == null ||
+        incomingRequest.getSender().equals(localAddress);
 
     InternalDistributedMember mbr = incomingRequest.getMemberID();
 
@@ -517,9 +541,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       return;
     }
 
-    logger.info("Membership received a request to remove " + mbr
+    if (!fromMe) {
+      logger.info("Membership received a request to remove " + mbr
         + " from " + incomingRequest.getSender() 
         + " reason="+incomingRequest.getReason());
+    }
 
     if (mbr.equals(this.localAddress)) {
       // oops - I've been kicked out
@@ -528,7 +554,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
 
     if (getPendingRequestIDs(REMOVE_MEMBER_REQUEST).contains(mbr)) {
-      logger.debug("ignoring request as I already have a removal request for this member");
+      logger.debug("ignoring removal request as I already have a removal request for this
member");
       return;
     }
 
@@ -805,6 +831,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       }
     } else { // !preparing
       if (isJoined && currentView != null && !view.contains(this.localAddress))
{
+        logger.fatal("This member is no longer in the membership view.  My ID is {} and the
new view is {}", localAddress, view);
         forceDisconnect("This node is no longer in the membership view");
       } else {
         if (!m.isRebroadcast()) { // no need to ack a rebroadcast view
@@ -2066,6 +2093,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           }
         }
 
+        logger.debug("unresponsive members that could not be reached: {}", unresponsive);
+        
         List<InternalDistributedMember> failures = new ArrayList<>(currentView.getCrashedMembers().size()
+ unresponsive.size());
 
         if (conflictingView != null && !conflictingView.getCreator().equals(localAddress)
&& conflictingView.getViewId() > newView.getViewId()
@@ -2149,42 +2178,51 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
      * performs health checks on the collection of members, removing any that
      * are found to be healthy
      * 
-     * @param mbrs
+     * @param suspects
      */
-    private void removeHealthyMembers(final Collection<InternalDistributedMember> mbrs)
throws InterruptedException {
-      List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size());
+    private void removeHealthyMembers(final Set<InternalDistributedMember> suspects)
throws InterruptedException {
+      List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(suspects.size());
 
       Set<InternalDistributedMember> newRemovals = new HashSet<>();
       Set<InternalDistributedMember> newLeaves = new HashSet<>();
 
-      filterMembers(mbrs, newRemovals, REMOVE_MEMBER_REQUEST);
-      filterMembers(mbrs, newLeaves, LEAVE_REQUEST_MESSAGE);   
+      filterMembers(suspects, newRemovals, REMOVE_MEMBER_REQUEST);
+      filterMembers(suspects, newLeaves, LEAVE_REQUEST_MESSAGE);
+      newRemovals.removeAll(newLeaves);  // if we received a Leave req the member is "healthy"

       
-      for (InternalDistributedMember mbr : mbrs) {
+      suspects.removeAll(newLeaves);
+      
+      for (InternalDistributedMember mbr : suspects) {
+        if (newRemovals.contains(mbr) || newLeaves.contains(mbr)) {
+          continue; // no need to check this member - it's already been checked or is leaving
+        }
         checkers.add(new Callable<InternalDistributedMember>() {
           @Override
           public InternalDistributedMember call() throws Exception {
-            // return the member id if it fails health checks
             boolean available = GMSJoinLeave.this.checkIfAvailable(mbr);
             
             synchronized (viewRequests) {
               if (available) {
-                mbrs.remove(mbr);
+                suspects.remove(mbr);
               }
               viewRequests.notifyAll();
             }
             return mbr;
           }
+          @Override
+          public String toString() {
+            return mbr.toString();
+          }
         });
       }
-
-      mbrs.removeAll(newLeaves);
-
-      if (mbrs.isEmpty()) {
+      
+      if (checkers.isEmpty()) {
+        logger.debug("all unresponsive members are already scheduled to be removed");
         return;
       }
-      
-      ExecutorService svc = Executors.newFixedThreadPool(mbrs.size(), new ThreadFactory()
{
+
+      logger.debug("checking availability of these members: {}", checkers);
+      ExecutorService svc = Executors.newFixedThreadPool(suspects.size(), new ThreadFactory()
{
         AtomicInteger i = new AtomicInteger();
 
         @Override
@@ -2196,17 +2234,22 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
       try {
         long giveUpTime = System.currentTimeMillis() + viewAckTimeout;
-        List<Future<InternalDistributedMember>> futures;
-        futures = submitAll(svc, checkers);
+        // submit the tasks that will remove dead members from the suspects collection
+        submitAll(svc, checkers);
+        
+        // now wait for the tasks to do their work
         long waitTime = giveUpTime - System.currentTimeMillis();
         synchronized (viewRequests) {
-          while(waitTime>0 ) {
-            logger.debug("removeHealthyMembers: mbrs" + mbrs.size());
+          while ( waitTime > 0 ) {
+            logger.debug("removeHealthyMembers: mbrs" + suspects.size());
             
-            filterMembers(mbrs, newRemovals, REMOVE_MEMBER_REQUEST);
-            filterMembers(mbrs, newLeaves, LEAVE_REQUEST_MESSAGE);   
+            filterMembers(suspects, newRemovals, REMOVE_MEMBER_REQUEST);
+            filterMembers(suspects, newLeaves, LEAVE_REQUEST_MESSAGE);
+            newRemovals.removeAll(newLeaves);
             
-            if(mbrs.isEmpty()) {
+            suspects.removeAll(newLeaves);
+            
+            if(suspects.isEmpty() || newRemovals.containsAll(suspects)) {
               break;
             }
             
@@ -2214,31 +2257,28 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
             waitTime = giveUpTime - System.currentTimeMillis();
           }
         }
-        
-        //we have waited for all members, now check if we considered any removeRequest;
-        //add them back to create new view
-        if(!newRemovals.isEmpty()) {
-          newRemovals.removeAll(newLeaves);
-          mbrs.addAll(newRemovals);
-        }
-        
       } finally {
         svc.shutdownNow();
       }
     }
 
-    protected void filterMembers(Collection<InternalDistributedMember> mbrs, Set<InternalDistributedMember>
removalRequestForMembers, short requestType) {
-      Set<InternalDistributedMember> gotRemovalRequests = getPendingRequestIDs(requestType);
+    /**
+     * This gets pending requests and returns the IDs of any that are in the given collection
+     * @param mbrs collection of IDs to search for
+     * @param matchingMembers collection to store matching IDs in
+     * @param requestType leave/remove/join
+     */
+    protected void filterMembers(Collection<InternalDistributedMember> mbrs, Set<InternalDistributedMember>
matchingMembers, short requestType) {
+      Set<InternalDistributedMember> requests = getPendingRequestIDs(requestType);
       
-      if(!gotRemovalRequests.isEmpty()) {
-        logger.debug("removeHealthyMembers: gotRemovalRequests " + gotRemovalRequests.size());
-        Iterator<InternalDistributedMember> itr = gotRemovalRequests.iterator();
+      if(!requests.isEmpty()) {
+        logger.debug("filterMembers: processing " + requests.size() + " requests for type
" + requestType);
+        Iterator<InternalDistributedMember> itr = requests.iterator();
         while(itr.hasNext()) {
-          InternalDistributedMember removeMember = itr.next();
-          if(mbrs.contains(removeMember)) {
+          InternalDistributedMember memberID = itr.next();
+          if(mbrs.contains(memberID)) {
             testFlagForRemovalRequest = true;
-            removalRequestForMembers.add(removeMember);
-            mbrs.remove(removeMember);
+            matchingMembers.add(memberID);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db654a78/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index be2c405..3bd1e83 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -401,8 +401,8 @@ public class JGroupsMessenger implements Messenger {
         }
       }
       if (recipient != null) {
-        services.getHealthMonitor().checkIfAvailable(recipient,
-            "Unable to send messages to this member via JGroups", true);
+        services.getHealthMonitor().suspect(recipient,
+            "Unable to send messages to this member via JGroups");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db654a78/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 5b58c27..79aa02a 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -303,6 +303,22 @@ public class GMSJoinLeaveJUnitTest {
   }
   
   @Test
+  public void testIsMemberLeaving() throws Exception {
+    initMocks();
+    prepareAndInstallView(mockMembers[0], createMemberList(mockMembers[0], mockMembers[1],
gmsJoinLeaveMemberId));
+    MethodExecuted removeMessageSent = new MethodExecuted();
+    when(messenger.send(any(RemoveMemberMessage.class))).thenAnswer(removeMessageSent);
+    assertFalse(gmsJoinLeave.isMemberLeaving(mockMembers[0]));
+    assertFalse(gmsJoinLeave.isMemberLeaving(mockMembers[1]));
+    gmsJoinLeave.remove(mockMembers[0], "removing for test");
+    assertTrue(gmsJoinLeave.isMemberLeaving(mockMembers[0]));
+    LeaveRequestMessage msg = new LeaveRequestMessage(gmsJoinLeave.getMemberID(), mockMembers[1],
"leaving for test");
+    msg.setSender(mockMembers[1]);
+    gmsJoinLeave.processMessage(msg);
+    assertTrue(gmsJoinLeave.isMemberLeaving(mockMembers[1]));
+  }
+  
+  @Test
   public void testRemoveAndLeaveIsNotACrash() throws Exception {
     // simultaneous leave & remove requests for a member
     // should not result in it's being seen as a crashed member

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/db654a78/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
index 805dd88..f30efc8 100755
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessengerJUnitTest.java
@@ -164,7 +164,7 @@ public class JGroupsMessengerJUnitTest {
     when(joinLeave.getView()).thenReturn(v);
     messenger.installView(v);
     messenger.handleJGroupsIOException(new IOException("je m'en fiche"), new JGAddress(v.getMembers().get(1)));
-    verify(healthMonitor).checkIfAvailable(isA(InternalDistributedMember.class), isA(String.class),
isA(Boolean.class));
+    verify(healthMonitor).suspect(isA(InternalDistributedMember.class), isA(String.class));
   }
   
   @Test
@@ -742,7 +742,7 @@ public class JGroupsMessengerJUnitTest {
     IOException ioe = new IOException("test exception");
     messenger.handleJGroupsIOException(ioe, new JGAddress(mbr));
     messenger.handleJGroupsIOException(ioe, new JGAddress(mbr)); // should be ignored
-    verify(healthMonitor).checkIfAvailable(mbr, "Unable to send messages to this member via
JGroups", true);
+    verify(healthMonitor).suspect(mbr, "Unable to send messages to this member via JGroups");
   }
   
   @Test


Mime
View raw message