geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/6] incubator-geode git commit: GEODE-1874: Checkin after code formatting refactor
Date Mon, 07 Nov 2016 20:22:08 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop 3046ea831 -> 15450f5c3


GEODE-1874: Checkin after code formatting refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1661504f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1661504f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1661504f

Branch: refs/heads/develop
Commit: 1661504f2944870d59a8c545bba80dfc19a58b94
Parents: b0c6f05
Author: Udo Kohlmeyer <ukohlmeyer@pivotal.io>
Authored: Fri Oct 21 16:11:54 2016 -0700
Committer: Udo Kohlmeyer <ukohlmeyer@pivotal.io>
Committed: Tue Nov 8 07:09:19 2016 +1100

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     | 410 ++++++++++---------
 1 file changed, 227 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1661504f/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index b3598cd..ec1d606 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -1,22 +1,46 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
  */
 package org.apache.geode.distributed.internal.membership.gms.fd;
 
-import static org.apache.geode.internal.DataSerializableFixedID.*;
+import static org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_REQUEST;
+import static org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE;
+import static org.apache.geode.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.GemFireConfigException;
+import org.apache.geode.SystemConnectException;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.NetView;
+import org.apache.geode.distributed.internal.membership.gms.GMSMember;
+import org.apache.geode.distributed.internal.membership.gms.Services;
+import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
+import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
+import org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatMessage;
+import org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatRequestMessage;
+import org.apache.geode.distributed.internal.membership.gms.messages.SuspectMembersMessage;
+import org.apache.geode.distributed.internal.membership.gms.messages.SuspectRequest;
+import org.apache.geode.internal.ConnectionWatcher;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+import org.apache.logging.log4j.Logger;
+import org.jgroups.util.UUID;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -52,52 +76,24 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-import org.apache.logging.log4j.Logger;
-import org.jgroups.util.UUID;
-
-import org.apache.geode.CancelException;
-import org.apache.geode.GemFireConfigException;
-import org.apache.geode.SystemConnectException;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.DMStats;
-import org.apache.geode.distributed.internal.DistributionMessage;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.distributed.internal.membership.NetView;
-import org.apache.geode.distributed.internal.membership.gms.GMSMember;
-import org.apache.geode.distributed.internal.membership.gms.Services;
-import org.apache.geode.distributed.internal.membership.gms.interfaces.HealthMonitor;
-import org.apache.geode.distributed.internal.membership.gms.interfaces.MessageHandler;
-import org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatMessage;
-import org.apache.geode.distributed.internal.membership.gms.messages.HeartbeatRequestMessage;
-import org.apache.geode.distributed.internal.membership.gms.messages.SuspectMembersMessage;
-import org.apache.geode.distributed.internal.membership.gms.messages.SuspectRequest;
-import org.apache.geode.internal.ConnectionWatcher;
-import org.apache.geode.internal.Version;
-import org.apache.geode.internal.net.SocketCreatorFactory;
-import org.apache.geode.internal.security.SecurableCommunicationChannel;
-
 /**
  * Failure Detection
  * <p>
- * This class make sure that each member is alive and communicating to this member.
- * To make sure that we create the ring of members based on current view. On this
- * ring, each member make sure that next-member in ring is communicating with it.
- * For that we record last message timestamp from next-member. And if it sees this
- * member has not communicated in last period(member-timeout) then we check whether
- * this member is still alive or not. Based on that we informed probable coordinators
- * to remove that member from view.
+ * This class make sure that each member is alive and communicating to this member. To make
sure
+ * that we create the ring of members based on current view. On this ring, each member make
sure
+ * that next-member in ring is communicating with it. For that we record last message timestamp
from
+ * next-member. And if it sees this member has not communicated in last period(member-timeout)
then
+ * we check whether this member is still alive or not. Based on that we informed probable
+ * coordinators to remove that member from view.
  * <p>
- * It has {@link #suspect(InternalDistributedMember, String)} api, which can be used
- * to initiate suspect processing for any member. First is checks whether the member is
- * responding or not. Then it informs probable coordinators to remove that member from
- * view.
+ * It has {@link #suspect(InternalDistributedMember, String)} api, which can be used to initiate
+ * suspect processing for any member. First is checks whether the member is responding or
not. Then
+ * it informs probable coordinators to remove that member from view.
  * <p>
- * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to see
- * if that member is alive. Then based on removal flag it initiates the suspect processing
- * for that member.
+ * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to see if that
member is
+ * alive. Then based on removal flag it initiates the suspect processing for that member.
  */
-@SuppressWarnings({ "SynchronizationOnLocalVariableOrMethodParameter", "NullableProblems"
})
+@SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "NullableProblems"})
 public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
   private Services services;
@@ -114,22 +110,24 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   private static final Logger logger = Services.getLogger();
 
   /**
-   * The number of recipients of periodic heartbeats.  The recipients will
-   * be selected from the members that are likely to be monitoring this member.
+   * The number of recipients of periodic heartbeats. The recipients will be selected from
the
+   * members that are likely to be monitoring this member.
    */
   private static final int NUM_HEARTBEATS = Integer.getInteger("geode.heartbeat-recipients",
2);
 
   /**
-   * Member activity will be recorded per interval/period. Timer task will set interval's
starting time.
-   * Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL may be configured
+   * Member activity will be recorded per interval/period. Timer task will set interval's
starting
+   * time. Each interval will be member-timeout/LOGICAL_INTERVAL. LOGICAL_INTERVAL may be
configured
    * via a system property with a default of 2. At least 1 interval is needed.
    */
-  public static final int LOGICAL_INTERVAL = Integer.getInteger("geode.logical-message-received-interval",
2);
+  public static final int LOGICAL_INTERVAL =
+      Integer.getInteger("geode.logical-message-received-interval", 2);
 
   /**
    * stall time to wait for members leaving concurrently
    */
-  public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = Long.getLong("geode.suspect-member-collection-interval",
200);
+  public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL =
+      Long.getLong("geode.suspect-member-collection-interval", 200);
 
   private volatile long currentTimeStamp;
 
@@ -141,17 +139,20 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   /**
    * Timestamp at which we last had contact from a member
    */
-  final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps = new
ConcurrentHashMap<>();
+  final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps =
+      new ConcurrentHashMap<>();
 
   /**
    * Members currently being suspected and the view they were suspected in
    */
-  final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView
= new ConcurrentHashMap<>();
+  final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView
=
+      new ConcurrentHashMap<>();
 
   /**
    * Members undergoing final checks
    */
-  final private List<InternalDistributedMember> membersInFinalCheck = Collections.synchronizedList(new
ArrayList<>(30));
+  final private List<InternalDistributedMember> membersInFinalCheck =
+      Collections.synchronizedList(new ArrayList<>(30));
 
   /**
    * Replies to messages
@@ -215,9 +216,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   }
 
   /***
-   * This class sets start interval timestamp to record the activity of all members.
-   * That is used by {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to
-   * record the activity of member.
+   * This class sets start interval timestamp to record the activity of all members. That
is used by
+   * {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to record the activity
of
+   * member.
    *
    * It initiates the suspect processing for next neighbour if it doesn't see any activity
from that
    * member in last interval(member-timeout)
@@ -240,7 +241,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       InternalDistributedMember neighbour = nextNeighbor;
 
       long currentTime = System.currentTimeMillis();
-      //this is the start of interval to record member activity
+      // this is the start of interval to record member activity
       GMSHealthMonitor.this.currentTimeStamp = currentTime;
 
       if (neighbour != null) {
@@ -268,8 +269,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
 
   /***
    * Check thread waits on this object for response. It puts requestId in requestIdVsResponse
map.
-   * Response will have requestId, which is used to get ResponseObject. Then it is used to
-   * notify waiting thread.
+   * Response will have requestId, which is used to get ResponseObject. Then it is used to
notify
+   * waiting thread.
    */
   private class Response {
 
@@ -312,7 +313,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
         int myVmViewId = gmbr.getVmViewId();
         if (playingDead) {
           logger.debug("HealthMonitor: simulating sick member in health check");
-        } else if (uuidLSBs == myUUID.getLeastSignificantBits() && uuidMSBs == myUUID.getMostSignificantBits()
&& vmViewId == myVmViewId) {
+        } else if (uuidLSBs == myUUID.getLeastSignificantBits()
+            && uuidMSBs == myUUID.getMostSignificantBits() && vmViewId ==
myVmViewId) {
           logger.debug("HealthMonitor: sending OK reply");
           out.write(OK);
           out.flush();
@@ -322,7 +324,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
           logger.debug("HealthMonitor: server replied OK.");
         } else {
           if (logger.isDebugEnabled()) {
-            logger.debug("HealthMonitor: sending ERROR reply - my UUID is {},{} received
is {},{}.  My viewID is {} received is {}", Long.toHexString(myUUID.getMostSignificantBits()),
Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs), Long.toHexString(uuidLSBs),
myVmViewId, vmViewId);
+            logger.debug(
+                "HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}.
 My viewID is {} received is {}",
+                Long.toHexString(myUUID.getMostSignificantBits()),
+                Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs),
+                Long.toHexString(uuidLSBs), myVmViewId, vmViewId);
           }
           out.write(ERROR);
           out.flush();
@@ -385,7 +391,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   }
 
 
-  private HeartbeatRequestMessage constructHeartbeatRequestMessage(final InternalDistributedMember
mbr) {
+  private HeartbeatRequestMessage constructHeartbeatRequestMessage(
+      final InternalDistributedMember mbr) {
     final int reqId = requestId.getAndIncrement();
     final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, reqId);
     hrm.setRecipient(mbr);
@@ -433,8 +440,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   }
 
   /**
-   * This method sends heartbeat request to other member and waits for member-timeout
-   * time for response. If it doesn't see response then it returns false.
+   * This method sends heartbeat request to other member and waits for member-timeout time
for
+   * response. If it doesn't see response then it returns false.
    */
   private boolean doCheckMember(InternalDistributedMember member, boolean waitForResponse)
{
     if (playingDead || beingSick) {
@@ -485,7 +492,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
         }
       }
     } catch (InterruptedException e) {
-      logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response
from member: {} .", member);
+      logger.debug(
+          "GMSHealthMonitor checking thread interrupted, while waiting for response from
member: {} .",
+          member);
     } finally {
       if (waitForResponse) {
         requestIdVsResponse.remove(hrm.getRequestId());
@@ -495,19 +504,22 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   }
 
   /**
-   * During final check, establish TCP connection between current member and suspect member.
-   * And exchange PING/PONG message to see if the suspect member is still alive.
-   *
+   * During final check, establish TCP connection between current member and suspect member.
And
+   * exchange PING/PONG message to see if the suspect member is still alive.
    * @param suspectMember member that does not respond to HeartbeatRequestMessage
-   *
    * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
    */
   boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
     Socket clientSocket = null;
-    InternalDistributedSystem internalDistributedSystem = InternalDistributedSystem.getConnectedInstance();
+    InternalDistributedSystem internalDistributedSystem =
+        InternalDistributedSystem.getConnectedInstance();
     try {
-      logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember,
suspectMember.getInetAddress(), port);
-      clientSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).connect(suspectMember.getInetAddress(),
port, (int) memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false,
-1, false);
+      logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember,
+          suspectMember.getInetAddress(), port);
+      clientSocket =
+          SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
+              .connect(suspectMember.getInetAddress(), port, (int) memberTimeout,
+                  new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1,
false);
       clientSocket.setTcpNoDelay(true);
       return doTCPCheckMember(suspectMember, clientSocket);
     } catch (IOException e) {
@@ -530,7 +542,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
     return false;
   }
 
-  //Package protected for testing purposes
+  // Package protected for testing purposes
   boolean doTCPCheckMember(InternalDistributedMember suspectMember, Socket clientSocket)
{
     try {
       if (clientSocket.isConnected()) {
@@ -544,7 +556,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
         logger.debug("Connected to suspect member - reading response");
         int b = in.read();
         if (logger.isDebugEnabled()) {
-          logger.debug("Received {}", (b == OK ? "OK" : (b == ERROR ? "ERROR" : "unknown
response: " + b)));
+          logger.debug("Received {}",
+              (b == OK ? "OK" : (b == ERROR ? "ERROR" : "unknown response: " + b)));
         }
         if (b >= 0) {
           this.stats.incFinalCheckResponsesReceived();
@@ -557,7 +570,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
           }
           return true;
         } else {
-          //received ERROR
+          // received ERROR
           return false;
         }
       } else {// cannot establish TCP connection with suspect member
@@ -584,19 +597,20 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   public void suspect(InternalDistributedMember mbr, String reason) {
     initiateSuspicion(mbr, reason);
     // Background suspect-collecting thread is currently disabled - it takes too long
-    //    synchronized (suspectRequests) {
-    //      SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
-    //      if (!suspectRequests.contains(sr)) {
-    //        logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
-    //        suspectRequests.add(sr);
-    //        suspectRequests.notify();
-    //      }
-    //    }
+    // synchronized (suspectRequests) {
+    // SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
+    // if (!suspectRequests.contains(sr)) {
+    // logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
+    // suspectRequests.add(sr);
+    // suspectRequests.notify();
+    // }
+    // }
   }
 
   @Override
   public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean initiateRemoval)
{
-    return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, (InternalDistributedMember)
mbr, reason);
+    return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval,
+        (InternalDistributedMember) mbr, reason);
   }
 
   public void start() {
@@ -612,7 +626,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       @Override
       public Thread newThread(Runnable r) {
         int id = threadIdx.getAndIncrement();
-        Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure Detection thread
" + id);
+        Thread th =
+            new Thread(Services.getThreadGroup(), r, "Geode Failure Detection thread " +
id);
         th.setDaemon(true);
         return th;
       }
@@ -621,16 +636,17 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
     long delay = memberTimeout / LOGICAL_INTERVAL;
     monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, TimeUnit.MILLISECONDS);
 
-    //    suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode
Suspect Message Collector", Services.getThreadGroup(), suspectRequests,
-    //        new Callback<SuspectRequest>() {
-    //      @Override
-    //      public void process(List<SuspectRequest> requests) {
-    //        GMSHealthMonitor.this.sendSuspectRequest(requests);
+    // suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode
Suspect
+    // Message Collector", Services.getThreadGroup(), suspectRequests,
+    // new Callback<SuspectRequest>() {
+    // @Override
+    // public void process(List<SuspectRequest> requests) {
+    // GMSHealthMonitor.this.sendSuspectRequest(requests);
     //
-    //      }
-    //    }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
-    //    suspectRequestCollectorThread.setDaemon(true);
-    //    suspectRequestCollectorThread.start()
+    // }
+    // }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
+    // suspectRequestCollectorThread.setDaemon(true);
+    // suspectRequestCollectorThread.start()
 
     serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
       final AtomicInteger threadIdx = new AtomicInteger();
@@ -638,7 +654,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       @Override
       public Thread newThread(Runnable r) {
         int id = threadIdx.getAndIncrement();
-        Thread th = new Thread(Services.getThreadGroup(), r, "Geode Failure Detection Server
thread " + id);
+        Thread th =
+            new Thread(Services.getThreadGroup(), r, "Geode Failure Detection Server thread
" + id);
         th.setDaemon(true);
         return th;
       }
@@ -649,33 +666,44 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
     ServerSocket serverSocket;
     try {
-      serverSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).createServerSocketUsingPortRange(socketAddress,
50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange,
false);
+      serverSocket = SocketCreatorFactory
+          .getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
+          .createServerSocketUsingPortRange(socketAddress, 50/* backlog */, true/* isBindAddress
*/,
+              false/* useNIO */, 65536/* tcpBufferSize */, portRange, false);
       socketPort = serverSocket.getLocalPort();
     } catch (IOException | SystemConnectException e) {
-      throw new GemFireConfigException("Unable to allocate a failure detection port in the
membership-port range", e);
+      throw new GemFireConfigException(
+          "Unable to allocate a failure detection port in the membership-port range", e);
     }
     return serverSocket;
   }
 
   /**
-   * start the thread that listens for tcp/ip connections and responds
-   * to connection attempts
+   * start the thread that listens for tcp/ip connections and responds to connection attempts
    */
   private void startTcpServer(ServerSocket ssocket) {
     // allocate a socket here so there are no race conditions between knowing the FD
     // socket port and joining the system
 
     serverSocketExecutor.execute(() -> {
-      logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(),
socketPort);
+      logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(),
+          socketPort);
       Socket socket = null;
       try {
-        while (!services.getCancelCriterion().isCancelInProgress() && !GMSHealthMonitor.this.isStopping)
{
+        while (!services.getCancelCriterion().isCancelInProgress()
+            && !GMSHealthMonitor.this.isStopping) {
           try {
             socket = ssocket.accept();
             if (GMSHealthMonitor.this.playingDead) {
               continue;
             }
-            serverSocketExecutor.execute(new ClientSocketHandler(socket)); //start();  [bruce]
I'm seeing a lot of failures due to this thread not being created fast enough, sometimes as
long as 30 seconds
+            serverSocketExecutor.execute(new ClientSocketHandler(socket)); // start(); [bruce]
I'm
+            // seeing a lot of
+            // failures due to this
+            // thread not being
+            // created fast enough,
+            // sometimes as long as
+            // 30 seconds
 
           } catch (RejectedExecutionException e) {
             // this can happen during shutdown
@@ -710,8 +738,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   }
 
   /**
-   * start the thread that periodically sends a message to processes
-   * that might be watching this process
+   * start the thread that periodically sends a message to processes that might be watching
this
+   * process
    */
   private void startHeartbeatThread() {
     checkExecutor.execute(new Runnable() {
@@ -795,22 +823,24 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
     synchronized (viewVsSuspectedMembers) {
       viewVsSuspectedMembers.clear();
     }
-    for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator();
it.hasNext(); ) {
+    for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator();
it
+        .hasNext(); ) {
       if (!newView.contains(it.next())) {
         it.remove();
       }
     }
-    for (Iterator<InternalDistributedMember> it = suspectedMemberInView.keySet().iterator();
it.hasNext(); ) {
+    for (Iterator<InternalDistributedMember> it = suspectedMemberInView.keySet().iterator();
it
+        .hasNext(); ) {
       if (!newView.contains(it.next())) {
         it.remove();
       }
     }
-    //    for (InternalDistributedMember mbr: newView.getMembers()) {
-    //      if (!memberVsLastMsgTS.containsKey(mbr)) {
-    //        CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis());
-    //        memberVsLastMsgTS.put(mbr, customTS);
-    //      }
-    //    }
+    // for (InternalDistributedMember mbr: newView.getMembers()) {
+    // if (!memberVsLastMsgTS.containsKey(mbr)) {
+    // CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis());
+    // memberVsLastMsgTS.put(mbr, customTS);
+    // }
+    // }
     currentView = newView;
     setNextNeighbor(newView, null);
   }
@@ -818,11 +848,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   /***
    * This method sets next neighbour which it needs to watch in current view.
    *
-   * if nextTo == null
-   * then it watches member next to it.
+   * if nextTo == null then it watches member next to it.
    *
-   * It becomes null when we suspect current neighbour, during that time it watches
-   * member next to suspect member.
+   * It becomes null when we suspect current neighbour, during that time it watches member
next to
+   * suspect member.
    */
   private synchronized void setNextNeighbor(NetView newView, InternalDistributedMember nextTo)
{
     if (newView == null) {
@@ -834,14 +863,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
 
     List<InternalDistributedMember> allMembers = newView.getMembers();
     //
-    //    Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers);
-    //    checkAllSuspected.removeAll(suspectedMemberInView.keySet());
-    //    checkAllSuspected.remove(localAddress);
-    //    if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
-    //      logger.info("All other members are suspect at this point");
-    //      nextNeighbor = null;
-    //      return;
-    //    }
+    // Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers);
+    // checkAllSuspected.removeAll(suspectedMemberInView.keySet());
+    // checkAllSuspected.remove(localAddress);
+    // if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
+    // logger.info("All other members are suspect at this point");
+    // nextNeighbor = null;
+    // return;
+    // }
 
     if (allMembers.size() > 1 && suspectedMemberInView.size() >= allMembers.size()
- 1) {
       boolean nonSuspectFound = false;
@@ -901,7 +930,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   @Override
   public void started() {
     setLocalAddress(services.getMessenger().getMemberID());
-    serverSocket = createServerSocket(localAddress.getInetAddress(), services.getConfig().getMembershipPortRange());
+    serverSocket = createServerSocket(localAddress.getInetAddress(),
+        services.getConfig().getMembershipPortRange());
     startTcpServer(serverSocket);
     startHeartbeatThread();
   }
@@ -948,19 +978,21 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      logger.info("GMSHealthMonitor serverSocketExecutor is " + (serverSocketExecutor.isTerminated()
? "terminated" : "not terminated"));
+      logger.info("GMSHealthMonitor serverSocketExecutor is "
+          + (serverSocketExecutor.isTerminated() ? "terminated" : "not terminated"));
     }
 
-    //    if (suspectRequestCollectorThread != null) {
-    //      suspectRequestCollectorThread.shutdown();
-    //    }
+    // if (suspectRequestCollectorThread != null) {
+    // suspectRequestCollectorThread.shutdown();
+    // }
   }
 
   /***
    * test method
    */
   public boolean isShutdown() {
-    return scheduler.isShutdown() && checkExecutor.isShutdown() && serverSocketExecutor.isShutdown()
/*&& !suspectRequestCollectorThread.isAlive()*/;
+    return scheduler.isShutdown() && checkExecutor.isShutdown()
+        && serverSocketExecutor.isShutdown() /* && !suspectRequestCollectorThread.isAlive()
*/;
   }
 
   /**
@@ -976,7 +1008,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   }
 
   @Override
-  public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember
suspect, String reason) {
+  public void memberSuspected(InternalDistributedMember initiator,
+                              InternalDistributedMember suspect, String reason) {
   }
 
   @Override
@@ -1067,7 +1100,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
     this.stats.incHeartbeatsReceived();
     if (m.getRequestId() >= 0) {
       Response resp = requestIdVsResponse.get(m.getRequestId());
-      logger.trace("Got heartbeat from member {}. {}", m.getSender(), (resp != null ? "Check
thread still waiting" : "Check thread is not waiting"));
+      logger.trace("Got heartbeat from member {}. {}", m.getSender(),
+          (resp != null ? "Check thread still waiting" : "Check thread is not waiting"));
       if (resp != null) {
         synchronized (resp) {
           resp.setResponseMsg(m);
@@ -1076,15 +1110,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       }
 
     }
-    //we got heartbeat lets update timestamp
+    // we got heartbeat lets update timestamp
     contactedBy(m.getSender(), System.currentTimeMillis());
   }
 
   /**
-   * Process a Suspect request from another member. This may cause this member
-   * to become the new membership coordinator.
-   * it will to final check on that member and then it will send remove request
-   * for that member
+   * Process a Suspect request from another member. This may cause this member to become
the new
+   * membership coordinator. it will to final check on that member and then it will send
remove
+   * request for that member
    */
   private void processSuspectMembersRequest(SuspectMembersMessage incomingRequest) {
 
@@ -1101,8 +1134,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
     InternalDistributedMember sender = incomingRequest.getSender();
     int viewId = sender.getVmViewId();
     if (cv.getViewId() >= viewId && !cv.contains(incomingRequest.getSender()))
{
-      logger.info("Membership ignoring suspect request for " + incomingRequest + " from non-member
" + incomingRequest.getSender());
-      services.getJoinLeave().remove(sender, "this process is initiating suspect processing
but is no longer a member");
+      logger.info("Membership ignoring suspect request for " + incomingRequest + " from non-member
"
+          + incomingRequest.getSender());
+      services.getJoinLeave().remove(sender,
+          "this process is initiating suspect processing but is no longer a member");
       return;
     }
 
@@ -1124,13 +1159,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       }
     }
 
-
     if (cv.getCoordinator().equals(localAddress)) {
       for (SuspectRequest req : incomingRequest.getMembers()) {
-        logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
req.getReason());
+        logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
+            req.getReason());
       }
       checkIfAvailable(sender, sMembers, cv);
-    }// coordinator ends
+    } // coordinator ends
     else {
 
       NetView check = new NetView(cv, cv.getViewId() + 1);
@@ -1148,7 +1183,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       if (coordinator != null && coordinator.equals(localAddress)) {
         // new coordinator
         for (SuspectRequest req : incomingRequest.getMembers()) {
-          logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
req.getReason());
+          logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
+              req.getReason());
         }
         checkIfAvailable(sender, smbr, cv);
       } else {
@@ -1159,8 +1195,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   }
 
   /***
-   * This method make sure that records suspectRequest. We need to make sure this
-   * on preferred coordinators, as elder coordinator might be in suspected list next. 
+   * This method make sure that records suspectRequest. We need to make sure this on preferred
+   * coordinators, as elder coordinator might be in suspected list next.
    */
   private void recordSuspectRequests(List<SuspectRequest> sMembers, NetView cv) {
     // record suspect requests
@@ -1178,12 +1214,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
   }
 
   /**
-   * performs a "final" health check on the member.  If failure-detection
-   * socket information is available for the member (in the view) then
-   * we attempt to connect to its socket and ask if it's the expected member.
-   * Otherwise we send a heartbeat request and wait for a reply.
+   * performs a "final" health check on the member. If failure-detection socket information
is
+   * available for the member (in the view) then we attempt to connect to its socket and
ask if it's
+   * the expected member. Otherwise we send a heartbeat request and wait for a reply.
    */
-  private void checkIfAvailable(final InternalDistributedMember initiator, List<SuspectRequest>
sMembers, final NetView cv) {
+  private void checkIfAvailable(final InternalDistributedMember initiator,
+                                List<SuspectRequest> sMembers, final NetView cv) {
 
     for (final SuspectRequest sr : sMembers) {
       final InternalDistributedMember mbr = sr.getSuspectMember();
@@ -1198,13 +1234,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
 
       // suspectMemberInView is now set by the heartbeat monitoring code
       // to allow us to move on from watching members we've already
-      // suspected.  Since that code is updating this collection we
+      // suspected. Since that code is updating this collection we
       // cannot use it here as an indication that a member is currently
       // undergoing a final check.
-      //      NetView view;
-      //      view = suspectedMemberInView.putIfAbsent(mbr, cv);
+      // NetView view;
+      // view = suspectedMemberInView.putIfAbsent(mbr, cv);
 
-      //      if (view == null || !view.equals(cv)) {
+      // if (view == null || !view.equals(cv)) {
       final String reason = sr.getReason();
       logger.debug("Scheduling final check for member {}; reason={}", mbr, reason);
       // its a coordinator
@@ -1219,11 +1255,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
           GMSHealthMonitor.this.suspectedMemberInView.remove(mbr);
         }
       });
-      //      }// scheduling for final check and removing it..
+      // }// scheduling for final check and removing it..
     }
   }
 
-  private boolean inlineCheckIfAvailable(final InternalDistributedMember initiator, final
NetView cv, boolean initiateRemoval, final InternalDistributedMember mbr, final String reason)
{
+  private boolean inlineCheckIfAvailable(final InternalDistributedMember initiator,
+                                         final NetView cv, boolean initiateRemoval,
+                                         final InternalDistributedMember mbr,
+                                         final String reason) {
 
     if (services.getJoinLeave().isMemberLeaving(mbr)) {
       return false;
@@ -1245,7 +1284,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
       if (port <= 0) {
         logger.info("Unable to locate failure detection port - requesting a heartbeat");
         if (logger.isDebugEnabled()) {
-          logger.debug("\ncurrent view: {}\nports: {}", cv, Arrays.toString(cv.getFailureDetectionPorts()));
+          logger.debug("\ncurrent view: {}\nports: {}", cv,
+              Arrays.toString(cv.getFailureDetectionPorts()));
         }
         pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
         GMSHealthMonitor.this.stats.incFinalCheckRequestsSent();
@@ -1255,9 +1295,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
           GMSHealthMonitor.this.stats.incUdpFinalCheckResponsesReceived();
         }
       } else {
-        //this will just send heartbeat request, it will not wait for response
-        //if we will get heartbeat then it will change the timestamp, which we are 
-        //checking below in case of tcp check failure..
+        // this will just send heartbeat request, it will not wait for response
+        // if we will get heartbeat then it will change the timestamp, which we are
+        // checking below in case of tcp check failure..
         doCheckMember(mbr, false);
         pinged = doTCPCheckMember(mbr, port);
       }
@@ -1271,7 +1311,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
           }
           failed = true;
         } else {
-          logger.info("Final check failed but detected recent message traffic for suspect
member " + mbr);
+          logger.info(
+              "Final check failed but detected recent message traffic for suspect member
" + mbr);
         }
       }
       if (!failed) {
@@ -1297,25 +1338,28 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler
{
 
   private void sendSuspectRequest(final List<SuspectRequest> requests) {
     // the background suspect-collector thread is currently disabled
-    //    synchronized (suspectRequests) {
-    //      if (suspectRequests.size() > 0) {
-    //        for (SuspectRequest sr: suspectRequests) {
-    //          if (!requests.contains(sr)) {
-    //            requests.add(sr);
-    //          }
-    //        }
-    //        suspectRequests.clear();
-    //      }
-    //    }
+    // synchronized (suspectRequests) {
+    // if (suspectRequests.size() > 0) {
+    // for (SuspectRequest sr: suspectRequests) {
+    // if (!requests.contains(sr)) {
+    // requests.add(sr);
+    // }
+    // }
+    // suspectRequests.clear();
+    // }
+    // }
     logger.debug("Sending suspect request for members {}", requests);
     List<InternalDistributedMember> recipients;
     if (currentView.size() > 4) {
       HashSet<InternalDistributedMember> filter = new HashSet<>();
-      for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys();
e.hasMoreElements(); ) {
+      for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys();
e
+          .hasMoreElements(); ) {
         filter.add(e.nextElement());
       }
-      filter.addAll(requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
-      recipients = currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(),
5);
+      filter.addAll(
+          requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
+      recipients =
+          currentView.getPreferredCoordinators(filter, services.getJoinLeave().getMemberID(),
5);
     } else {
       recipients = currentView.getMembers();
     }



Mime
View raw message