geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wireba...@apache.org
Subject [geode] branch develop updated: GEODE-4094: ClientHealthMonitor may cause an NPE in a ServerConnection (#1326)
Date Fri, 26 Jan 2018 22:22:49 GMT
This is an automated email from the ASF dual-hosted git repository.

wirebaron pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2f81f40  GEODE-4094: ClientHealthMonitor may cause an NPE in a ServerConnection (#1326)
2f81f40 is described below

commit 2f81f40727cfd02296e9e0f041b6ae2bb9cd1a45
Author: Brian Rowe <browe@pivotal.io>
AuthorDate: Fri Jan 26 14:22:46 2018 -0800

    GEODE-4094: ClientHealthMonitor may cause an NPE in a ServerConnection (#1326)
    
    * GEODE-4094: ClientHealthMonitor may cause an NPE in a ServerConnection
    
    - minor refactoring of AcceptorImpl and Handshake to improve testability
    - added a unit test to demonstrate race condition
    - refactored connection map into a new object to prevent race
---
 .../internal/cache/tier/sockets/AcceptorImpl.java  |   7 -
 .../cache/tier/sockets/ClientHealthMonitor.java    | 253 +++++++++------------
 .../internal/cache/tier/sockets/HandShake.java     |   2 +-
 .../cache/tier/sockets/ServerConnection.java       |  57 +++--
 .../tier/sockets/ServerConnectionCollection.java   |  44 ++++
 .../tier/sockets/ServerHandShakeProcessor.java     |   8 +-
 .../cache/tier/sockets/ServerConnectionTest.java   | 139 ++++++++++-
 7 files changed, 318 insertions(+), 192 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 360fa48..010e07c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -301,7 +301,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool
{
   private long acceptorId;
 
   private static boolean isAuthenticationRequired;
-  private static boolean isIntegratedSecurity;
 
   private static boolean isPostAuthzCallbackPresent;
 
@@ -547,8 +546,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool
{
 
     isAuthenticationRequired = this.securityService.isClientSecurityRequired();
 
-    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
-
     String postAuthzFactoryName =
         this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
 
@@ -1784,10 +1781,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool
{
     return isAuthenticationRequired;
   }
 
-  public static boolean isIntegratedSecurity() {
-    return isIntegratedSecurity;
-  }
-
   public static boolean isPostAuthzCallbackPresent() {
     return isPostAuthzCallbackPresent;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index afefb69..d08902d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -70,16 +69,6 @@ public class ClientHealthMonitor {
   private final Object _clientHeartbeatsLock = new Object();
 
   /**
-   * The map of known client threads
-   */
-  private final Map _clientThreads;
-
-  /**
-   * An object used to lock the map of client threads
-   */
-  private final Object _clientThreadsLock = new Object();
-
-  /**
    * THe GemFire <code>Cache</code>
    */
   private final InternalCache _cache;
@@ -124,6 +113,12 @@ public class ClientHealthMonitor {
   private final HashMap cleanupProxyIdTable = new HashMap();
 
   /**
+   * Used to track the connections for a particular client
+   */
+  private final HashMap<ClientProxyMembershipID, ServerConnectionCollection> proxyIdConnections
=
+      new HashMap<>();
+
+  /**
    * Gives, version-wise, the number of clients connected to the cache servers in this cache,
which
    * are capable of processing recieved deltas.
    *
@@ -349,18 +344,12 @@ public class ClientHealthMonitor {
    * @param proxyID The membership id of the client to be updated
    * @param connection The thread processing client requests
    */
-  public void addConnection(ClientProxyMembershipID proxyID, ServerConnection connection)
{
-    // logger.info("ClientHealthMonitor: Adding " + connection + " to
-    // client with member id " + proxyID);
-    synchronized (_clientThreadsLock) {
-      Set serverConnections = (Set) this._clientThreads.get(proxyID);
-      if (serverConnections == null) {
-        serverConnections = new HashSet();
-        this._clientThreads.put(proxyID, serverConnections);
-      }
-      serverConnections.add(connection);
-      // logger.info("ClientHealthMonitor: The client with member id " +
-      // proxyID + " contains " + serverConnections.size() + " threads");
+  public ServerConnectionCollection addConnection(ClientProxyMembershipID proxyID,
+      ServerConnection connection) {
+    synchronized (proxyIdConnections) {
+      ServerConnectionCollection collection = getProxyIdCollection(proxyID);
+      collection.addConnection(connection);
+      return collection;
     }
   }
 
@@ -371,18 +360,12 @@ public class ClientHealthMonitor {
    * @param connection The thread processing client requests
    */
   public void removeConnection(ClientProxyMembershipID proxyID, ServerConnection connection)
{
-    // logger.info("ClientHealthMonitor: Removing " + connection + " from
-    // client with member id " + proxyID);
-    synchronized (_clientThreadsLock) {
-      Set serverConnections = (Set) this._clientThreads.get(proxyID);
-      if (serverConnections != null) { // fix for bug 35343
-        serverConnections.remove(connection);
-        // logger.info("ClientHealthMonitor: The client with member id " +
-        // proxyID + " contains " + serverConnections.size() + " threads");
-        if (serverConnections.isEmpty()) {
-          // logger.info("ClientHealthMonitor: The client with member id "
-          // + proxyID + " is being removed since it contains 0 threads");
-          this._clientThreads.remove(proxyID);
+    synchronized (proxyIdConnections) {
+      ServerConnectionCollection collection = proxyIdConnections.get(proxyID);
+      if (collection != null) {
+        collection.removeConnection(connection);
+        if (collection.getConnections().isEmpty()) {
+          proxyIdConnections.remove(proxyID);
         }
       }
     }
@@ -419,24 +402,21 @@ public class ClientHealthMonitor {
    *        ConnectionProxies may be from same client member or different. If it is null
this would
    *        mean to fetch the Connections of all the ConnectionProxy objects.
    */
-  public Map getConnectedClients(Set filterProxies) {
-    Map map = new HashMap(); // KEY=proxyID, VALUE=connectionCount (Integer)
-    synchronized (_clientThreadsLock) {
-      Iterator connectedClients = this._clientThreads.entrySet().iterator();
-      while (connectedClients.hasNext()) {
-        Map.Entry entry = (Map.Entry) connectedClients.next();
-        ClientProxyMembershipID proxyID = (ClientProxyMembershipID) entry.getKey();// proxyID
-                                                                                   // includes
FQDN
+  public Map<String, Object[]> getConnectedClients(Set filterProxies) {
+    Map<String, Object[]> map = new HashMap<>(); // KEY=proxyID, VALUE=connectionCount
(Integer)
+    synchronized (proxyIdConnections) {
+      for (Map.Entry<ClientProxyMembershipID, ServerConnectionCollection> entry : proxyIdConnections
+          .entrySet()) {
+        ClientProxyMembershipID proxyID = entry.getKey();// proxyID
+        // includes FQDN
         if (filterProxies == null || filterProxies.contains(proxyID)) {
           String membershipID = null;
-          Set connections = (Set) entry.getValue();
+          Set<ServerConnection> connections = entry.getValue().getConnections();
           int socketPort = 0;
           InetAddress socketAddress = null;
           /// *
-          Iterator serverConnections = connections.iterator();
           // Get data from one.
-          while (serverConnections.hasNext()) {
-            ServerConnection sc = (ServerConnection) serverConnections.next();
+          for (ServerConnection sc : connections) {
             socketPort = sc.getSocketPort();
             socketAddress = sc.getSocketAddress();
             membershipID = sc.getMembershipID();
@@ -453,7 +433,7 @@ public class ClientHealthMonitor {
                 + " client member id=" + membershipID;
           }
           Object[] data = null;
-          data = (Object[]) map.get(membershipID);
+          data = map.get(membershipID);
           if (data == null) {
             map.put(membershipID, new Object[] {clientString, Integer.valueOf(connectionCount)});
           } else {
@@ -480,20 +460,17 @@ public class ClientHealthMonitor {
    *
    * @return Map of ClientProxyMembershipID against CacheClientStatus objects.
    */
-  public Map getStatusForAllClients() {
-    Map result = new HashMap();
-    synchronized (_clientThreadsLock) {
-      Iterator connectedClients = this._clientThreads.entrySet().iterator();
-      while (connectedClients.hasNext()) {
-        Map.Entry entry = (Map.Entry) connectedClients.next();
-        ClientProxyMembershipID proxyID = (ClientProxyMembershipID) entry.getKey();
+  public Map<ClientProxyMembershipID, CacheClientStatus> getStatusForAllClients() {
+    Map<ClientProxyMembershipID, CacheClientStatus> result = new HashMap<>();
+    synchronized (proxyIdConnections) {
+      for (Map.Entry<ClientProxyMembershipID, ServerConnectionCollection> entry : proxyIdConnections
+          .entrySet()) {
+        ClientProxyMembershipID proxyID = entry.getKey();
         CacheClientStatus cci = new CacheClientStatus(proxyID);
-        Set connections = (Set) this._clientThreads.get(proxyID);
+        Set<ServerConnection> connections = entry.getValue().getConnections();
         if (connections != null) {
           String memberId = null;
-          Iterator connectionsIterator = connections.iterator();
-          while (connectionsIterator.hasNext()) {
-            ServerConnection sc = (ServerConnection) connectionsIterator.next();
+          for (ServerConnection sc : connections) {
             if (sc.isClientServerConnection()) {
               memberId = sc.getMembershipID(); // each ServerConnection has the same member
id
               cci.setMemberId(memberId);
@@ -508,30 +485,27 @@ public class ClientHealthMonitor {
     return result;
   }
 
-  public void fillInClientInfo(Map allClients) {
+  public void fillInClientInfo(Map<ClientProxyMembershipID, CacheClientStatus> allClients)
{
     // The allClients parameter includes only actual clients (not remote
     // gateways). This monitor will include remote gateway connections,
     // so weed those out.
-    synchronized (_clientThreadsLock) {
-      Iterator allClientsIterator = allClients.entrySet().iterator();
-      while (allClientsIterator.hasNext()) {
-        Map.Entry entry = (Map.Entry) allClientsIterator.next();
-        ClientProxyMembershipID proxyID = (ClientProxyMembershipID) entry.getKey();// proxyID
-                                                                                   // includes
FQDN
-        CacheClientStatus cci = (CacheClientStatus) entry.getValue();
-        Set connections = (Set) this._clientThreads.get(proxyID);
+    synchronized (proxyIdConnections) {
+      for (Map.Entry<ClientProxyMembershipID, CacheClientStatus> entry : allClients.entrySet())
{
+        ClientProxyMembershipID proxyID = entry.getKey();// proxyID
+        // includes FQDN
+        CacheClientStatus cci = entry.getValue();
+        ServerConnectionCollection collection = proxyIdConnections.get(proxyID);
+        Set<ServerConnection> connections = collection != null ? collection.getConnections()
: null;
         if (connections != null) {
           String memberId = null;
           cci.setNumberOfConnections(connections.size());
           List socketPorts = new ArrayList();
           List socketAddresses = new ArrayList();
-          Iterator connectionsIterator = connections.iterator();
-          while (connectionsIterator.hasNext()) {
-            ServerConnection sc = (ServerConnection) connectionsIterator.next();
+          for (ServerConnection sc : connections) {
             socketPorts.add(Integer.valueOf(sc.getSocketPort()));
             socketAddresses.add(sc.getSocketAddress());
             memberId = sc.getMembershipID(); // each ServerConnection has the
-                                             // same member id
+            // same member id
           }
           cci.setMemberId(memberId);
           cci.setSocketPorts(socketPorts);
@@ -541,17 +515,14 @@ public class ClientHealthMonitor {
     }
   }
 
-  public Map getConnectedIncomingGateways() {
-    Map connectedIncomingGateways = new HashMap();
-    synchronized (_clientThreadsLock) {
-      Iterator connectedClients = this._clientThreads.entrySet().iterator();
-      while (connectedClients.hasNext()) {
-        Map.Entry entry = (Map.Entry) connectedClients.next();
-        ClientProxyMembershipID proxyID = (ClientProxyMembershipID) entry.getKey();
-        Set connections = (Set) entry.getValue();
-        Iterator connectionsIterator = connections.iterator();
-        while (connectionsIterator.hasNext()) {
-          ServerConnection sc = (ServerConnection) connectionsIterator.next();
+  public Map<String, IncomingGatewayStatus> getConnectedIncomingGateways() {
+    Map<String, IncomingGatewayStatus> connectedIncomingGateways = new HashMap<>();
+    synchronized (proxyIdConnections) {
+      for (Map.Entry<ClientProxyMembershipID, ServerConnectionCollection> entry : proxyIdConnections
+          .entrySet()) {
+        ClientProxyMembershipID proxyID = entry.getKey();
+        Set<ServerConnection> connections = entry.getValue().getConnections();
+        for (ServerConnection sc : connections) {
           if (sc.getCommunicationMode().isWAN()) {
             IncomingGatewayStatus status = new IncomingGatewayStatus(proxyID.getDSMembership(),
                 sc.getSocketAddress(), sc.getSocketPort());
@@ -566,19 +537,17 @@ public class ClientHealthMonitor {
   protected boolean cleanupClientThreads(ClientProxyMembershipID proxyID, boolean timedOut)
{
     boolean result = false;
     Set serverConnections = null;
-    synchronized (this._clientThreadsLock) {
-      serverConnections = (Set) this._clientThreads.remove(proxyID);
-      // It is ok to modify the set after releasing the sync
-      // because it has been removed from the map while holding
-      // the sync.
-    } // end sync here to fix bug 37576 and 36740
+    synchronized (proxyIdConnections) {
+      ServerConnectionCollection collection = proxyIdConnections.remove(proxyID);
+      if (collection != null) {
+        serverConnections = collection.getConnections();
+      }
+    }
     {
       if (serverConnections != null) { // fix for bug 35343
         result = true;
-        // logger.warn("Terminating " + serverConnections.size() + " connections");
         for (Iterator it = serverConnections.iterator(); it.hasNext();) {
           ServerConnection serverConnection = (ServerConnection) it.next();
-          // logger.warn("Terminating " + serverConnection);
           serverConnection.handleTermination(timedOut);
         }
       }
@@ -586,54 +555,51 @@ public class ClientHealthMonitor {
     return result;
   }
 
-  protected boolean isAnyThreadProcessingMessage(ClientProxyMembershipID proxyID) {
-    boolean processingMessage = false;
-    synchronized (this._clientThreadsLock) {
-      Set serverConnections = (Set) this._clientThreads.get(proxyID);
-      if (serverConnections != null) {
-        for (Iterator it = serverConnections.iterator(); it.hasNext();) {
-          ServerConnection serverConnection = (ServerConnection) it.next();
-          if (serverConnection.isProcessingMessage()) {
-            processingMessage = true;
-            break;
-          }
-        }
+  // This will return true if the proxyID is truly idle (or if no connections are found),
or false
+  // if there was a active connection.
+  private boolean prepareToTerminateIfNoConnectionIsProcessing(ClientProxyMembershipID proxyID)
{
+    synchronized (proxyIdConnections) {
+      ServerConnectionCollection collection = proxyIdConnections.get(proxyID);
+      if (collection == null) {
+        return true;
+      }
+      if (collection.connectionsProcessing.get() == 0) {
+        collection.isTerminating = true;
+        return true;
+      } else {
+        return false;
       }
     }
-    return processingMessage;
   }
 
   protected void validateThreads(ClientProxyMembershipID proxyID) {
-    Set serverConnections = null;
-    synchronized (this._clientThreadsLock) {
-      serverConnections = (Set) this._clientThreads.get(proxyID);
-      if (serverConnections != null) {
-        serverConnections = new HashSet(serverConnections);
-      }
+    Set<ServerConnection> serverConnections;
+    synchronized (proxyIdConnections) {
+      ServerConnectionCollection collection = proxyIdConnections.get(proxyID);
+      serverConnections =
+          collection != null ? new HashSet<>(collection.getConnections()) : Collections.emptySet();
     }
     // release sync and operation on copy to fix bug 37675
-    if (serverConnections != null) {
-      for (Iterator it = serverConnections.iterator(); it.hasNext();) {
-        ServerConnection serverConnection = (ServerConnection) it.next();
-        if (serverConnection.hasBeenTimedOutOnClient()) {
-          logger.warn(LocalizedMessage.create(
-              LocalizedStrings.ClientHealtMonitor_0_IS_BEING_TERMINATED_BECAUSE_ITS_CLIENT_TIMEOUT_OF_1_HAS_EXPIRED,
-              new Object[] {serverConnection,
-                  Integer.valueOf(serverConnection.getClientReadTimeout())}));
-          try {
-            serverConnection.handleTermination(true);
-            // Not all the code in a ServerConnection correctly
-            // handles interrupt. In particular it is possible to be doing
-            // p2p distribution and to have sent a message to one peer but
-            // to never send it to another due to interrupt.
-            // serverConnection.interruptOwner();
-          } finally {
-            // Just to be sure we clean it up.
-            // This call probably isn't needed.
-            removeConnection(proxyID, serverConnection);
-          }
+    for (ServerConnection serverConnection : serverConnections) {
+      if (serverConnection.hasBeenTimedOutOnClient()) {
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.ClientHealtMonitor_0_IS_BEING_TERMINATED_BECAUSE_ITS_CLIENT_TIMEOUT_OF_1_HAS_EXPIRED,
+            new Object[] {serverConnection,
+                Integer.valueOf(serverConnection.getClientReadTimeout())}));
+        try {
+          serverConnection.handleTermination(true);
+          // Not all the code in a ServerConnection correctly
+          // handles interrupt. In particular it is possible to be doing
+          // p2p distribution and to have sent a message to one peer but
+          // to never send it to another due to interrupt.
+          // serverConnection.interruptOwner();
+        } finally {
+          // Just to be sure we clean it up.
+          // This call probably isn't needed.
+          removeConnection(proxyID, serverConnection);
         }
       }
+
     }
   }
 
@@ -688,9 +654,6 @@ public class ClientHealthMonitor {
     this._cache = cache;
     this.maximumTimeBetweenPings = maximumTimeBetweenPings;
 
-    // Initialize the client threads map
-    this._clientThreads = new HashMap();
-
     this.monitorInterval = Long.getLong(CLIENT_HEALTH_MONITOR_INTERVAL_PROPERTY,
         DEFAULT_CLIENT_MONITOR_INTERVAL_IN_MILLIS);
     logger.debug("Setting monitorInterval to {}", this.monitorInterval);
@@ -722,6 +685,10 @@ public class ClientHealthMonitor {
     return "ClientHealthMonitor@" + Integer.toHexString(System.identityHashCode(this));
   }
 
+  public ServerConnectionCollection getProxyIdCollection(ClientProxyMembershipID proxyID)
{
+    return proxyIdConnections.computeIfAbsent(proxyID, key -> new ServerConnectionCollection());
+  }
+
   public Map getCleanupProxyIdTable() {
     return cleanupProxyIdTable;
   }
@@ -828,8 +795,6 @@ public class ClientHealthMonitor {
           if (logger.isTraceEnabled()) {
             logger.trace("Monitoring {} client(s)", getClientHeartbeats().size());
           }
-          // logger.warning("Monitoring " + getClientHeartbeats().size() +
-          // " client(s).");
 
           // Get the current time
           long currentTime = System.currentTimeMillis();
@@ -863,19 +828,19 @@ public class ClientHealthMonitor {
                 // This client has been idle for too long. Determine whether
                 // any of its ServerConnection threads are currently processing
                 // a message. If so, let it go. If not, disconnect it.
-                if (isAnyThreadProcessingMessage(proxyID)) {
-                  if (logger.isDebugEnabled()) {
-                    logger.debug(
-                        "Monitoring client with member id {}. It has been {} ms since the
latest heartbeat. This client would have been terminated but at least one of its threads is
processing a message.",
-                        entry.getKey(), (currentTime - latestHeartbeat));
-                  }
-                } else {
+                if (prepareToTerminateIfNoConnectionIsProcessing(proxyID)) {
                   if (cleanupClientThreads(proxyID, true)) {
                     logger.warn(LocalizedMessage.create(
                         LocalizedStrings.ClientHealthMonitor_MONITORING_CLIENT_WITH_MEMBER_ID_0_IT_HAD_BEEN_1_MS_SINCE_THE_LATEST_HEARTBEAT_MAX_INTERVAL_IS_2_TERMINATED_CLIENT,
                         new Object[] {entry.getKey(), currentTime - latestHeartbeat,
                             this._maximumTimeBetweenPings}));
                   }
+                } else {
+                  if (logger.isDebugEnabled()) {
+                    logger.debug(
+                        "Monitoring client with member id {}. It has been {} ms since the
latest heartbeat. This client would have been terminated but at least one of its threads is
processing a message.",
+                        entry.getKey(), (currentTime - latestHeartbeat));
+                  }
                 }
               } else {
                 if (logger.isTraceEnabled()) {
@@ -883,10 +848,6 @@ public class ClientHealthMonitor {
                       "Monitoring client with member id {}. It has been {} ms since the latest
heartbeat. This client is healthy.",
                       entry.getKey(), (currentTime - latestHeartbeat));
                 }
-                // logger.warning("Monitoring client with member id " +
-                // entry.getKey() + ". It has been " + (currentTime -
-                // latestHeartbeat) + " ms since the latest heartbeat. This
-                // client is healthy.");
               }
             }
           }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
index 91293f0..4b1f2c7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
@@ -1683,7 +1683,7 @@ public class HandShake implements ClientHandShake {
 
     Authenticator auth = null;
     try {
-      if (AcceptorImpl.isIntegratedSecurity()) {
+      if (securityService.isIntegratedSecurity()) {
         return securityService.login(credentials);
       } else {
         Method instanceGetter = ClassLoadUtil.methodFromName(authenticatorMethod);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 273485e..d4e5969 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -42,7 +42,6 @@ import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.client.internal.AbstractOp;
 import org.apache.geode.cache.client.internal.Connection;
 import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.HeapDataOutputStream;
 import org.apache.geode.internal.Version;
@@ -97,7 +96,7 @@ public abstract class ServerConnection implements Runnable {
 
   private Map commands;
 
-  private final SecurityService securityService;
+  protected final SecurityService securityService;
 
   protected final CacheServerStats stats;
 
@@ -107,6 +106,7 @@ public abstract class ServerConnection implements Runnable {
   // The key is the size of each ByteBuffer. The value is a queue of byte buffers all of
that size.
   private static final ConcurrentHashMap<Integer, LinkedBlockingQueue<ByteBuffer>>
commBufferMap =
       new ConcurrentHashMap<>(4, 0.75f, 1);
+  private ServerConnectionCollection serverConnectionCollection;
 
   public static ByteBuffer allocateCommBuffer(int size, Socket sock) {
     // I expect that size will almost always be the same value
@@ -194,17 +194,8 @@ public abstract class ServerConnection implements Runnable {
    */
   private int latestBatchIdReplied = -1;
 
-  /*
-   * Uniquely identifying the client's Distributed System
-   *
-   *
-   * private String membershipId;
-   *
-   *
-   * Uniquely identifying the client's ConnectionProxy object
-   *
-   *
-   * private String proxyID ;
+  /**
+   * Client identity from handshake
    */
   ClientProxyMembershipID proxyId;
 
@@ -333,7 +324,8 @@ public abstract class ServerConnection implements Runnable {
     synchronized (this.handShakeMonitor) {
       if (this.handshake == null) {
         // synchronized (getCleanupTable()) {
-        boolean readHandShake = ServerHandShakeProcessor.readHandShake(this, getSecurityService());
+        boolean readHandShake =
+            ServerHandShakeProcessor.readHandShake(this, getSecurityService(), acceptor);
         if (readHandShake) {
           if (this.handshake.isOK()) {
             try {
@@ -469,11 +461,7 @@ public abstract class ServerConnection implements Runnable {
     return acceptor.getClientHealthMonitor().getCleanupProxyIdTable();
   }
 
-  private ClientHealthMonitor getClientHealthMonitor() {
-    return acceptor.getClientHealthMonitor();
-  }
-
-  private boolean processHandShake() {
+  protected boolean processHandShake() {
     boolean result = false;
     boolean clientJoined = false;
     boolean registerClient = false;
@@ -559,8 +547,6 @@ public abstract class ServerConnection implements Runnable {
           numRefs = new Counter();
           numRefs.incr();
           getCleanupProxyIdTable().put(this.proxyId, numRefs);
-          InternalDistributedMember idm =
-              (InternalDistributedMember) this.proxyId.getDistributedMember();
         }
         this.incedCleanupProxyIdTableRef = true;
       }
@@ -583,7 +569,7 @@ public abstract class ServerConnection implements Runnable {
         chm.registerClient(this.proxyId);
       }
       // hitesh:it will add client connection in set
-      chm.addConnection(this.proxyId, this);
+      serverConnectionCollection = chm.addConnection(this.proxyId, this);
       this.acceptor.getConnectionListener().connectionOpened(registerClient, communicationMode);
       // Hitesh: add user creds in map for single user case.
     } // finally
@@ -725,8 +711,22 @@ public abstract class ServerConnection implements Runnable {
   }
 
   protected void doNormalMsg() {
+    if (serverConnectionCollection == null) {
+      // return here if we haven't successfully completed handshake
+      logger.warn("Continued processing ServerConnection after handshake failed");
+      this.processMessages = false;
+      return;
+    }
     Message msg = null;
     msg = BaseCommand.readRequest(this);
+    synchronized (serverConnectionCollection) {
+      if (serverConnectionCollection.isTerminating) {
+        // Client is being disconnected, don't try to process message.
+        this.processMessages = false;
+        return;
+      }
+      serverConnectionCollection.connectionsProcessing.incrementAndGet();
+    }
     ThreadState threadState = null;
     try {
       if (msg != null) {
@@ -775,7 +775,7 @@ public abstract class ServerConnection implements Runnable {
 
         // if a subject exists for this uniqueId, binds the subject to this thread so that
we can do
         // authorization later
-        if (AcceptorImpl.isIntegratedSecurity()
+        if (securityService.isIntegratedSecurity()
             && !isInternalMessage(this.requestMsg, allowInternalMessagesWithoutCredentials)
             && !this.communicationMode.isWAN()) {
           long uniqueId = getUniqueId();
@@ -799,13 +799,13 @@ public abstract class ServerConnection implements Runnable {
     } finally {
       // Keep track of the fact that a message is no longer being
       // processed.
+      serverConnectionCollection.connectionsProcessing.decrementAndGet();
       setNotProcessingMessage();
       clearRequestMsg();
       if (threadState != null) {
         threadState.clear();
       }
     }
-
   }
 
   private final Object terminationLock = new Object();
@@ -874,8 +874,6 @@ public abstract class ServerConnection implements Runnable {
             getCleanupProxyIdTable().remove(this.proxyId);
             // here we can remove entry multiuser map for client
             proxyIdVsClientUserAuths.remove(this.proxyId);
-            InternalDistributedMember idm =
-                (InternalDistributedMember) this.proxyId.getDistributedMember();
           }
         }
       }
@@ -937,7 +935,7 @@ public abstract class ServerConnection implements Runnable {
     return retCua;
   }
 
-  private void initializeCommands() {
+  protected void initializeCommands() {
     // The commands are cached here, but are just referencing the ones
     // stored in the CommandInitializer
     this.commands = CommandInitializer.getCommands(this);
@@ -1499,6 +1497,7 @@ public abstract class ServerConnection implements Runnable {
       logger.debug("{}: Closed connection", this.name);
     }
     releaseCommBuffer();
+    processMessages = false;
     return true;
   }
 
@@ -1762,7 +1761,7 @@ public abstract class ServerConnection implements Runnable {
       return null;
     }
 
-    if (AcceptorImpl.isIntegratedSecurity()) {
+    if (securityService.isIntegratedSecurity()) {
       return null;
     }
 
@@ -1796,7 +1795,7 @@ public abstract class ServerConnection implements Runnable {
       return null;
     }
 
-    if (AcceptorImpl.isIntegratedSecurity()) {
+    if (securityService.isIntegratedSecurity()) {
       return null;
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionCollection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionCollection.java
new file mode 100644
index 0000000..670b5a1
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionCollection.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// This is used to form of group of connections for a particular client. Note that these
objects are
+// managed by the ClientHealthMonitor, which also manages the synchronization of them.
+public class ServerConnectionCollection {
+  private Set<ServerConnection> connectionSet = new HashSet<ServerConnection>();
+
+  // Number of connections currently processing messages for this client
+  final AtomicInteger connectionsProcessing = new AtomicInteger();
+
+  // Indicates that the server is soon to be or already in the process of terminating connections
in
+  // this collection.
+  volatile boolean isTerminating = false;
+
+  public void addConnection(ServerConnection connection) {
+    connectionSet.add(connection);
+  }
+
+  public Set<ServerConnection> getConnections() {
+    return connectionSet;
+  }
+
+  public void removeConnection(ServerConnection connection) {
+    connectionSet.remove(connection);
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerHandShakeProcessor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerHandShakeProcessor.java
index c4265cd..e292813 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerHandShakeProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerHandShakeProcessor.java
@@ -79,8 +79,8 @@ public class ServerHandShakeProcessor {
     currentServerVersion = Version.fromOrdinalOrCurrent(ver);
   }
 
-  public static boolean readHandShake(ServerConnection connection,
-      SecurityService securityService) {
+  public static boolean readHandShake(ServerConnection connection, SecurityService securityService,
+      AcceptorImpl acceptorImpl) {
     boolean validHandShake = false;
     Version clientVersion = null;
     try {
@@ -123,7 +123,7 @@ public class ServerHandShakeProcessor {
 
       // Read the appropriate handshake
       if (clientVersion.compareTo(Version.GFE_57) >= 0) {
-        validHandShake = readGFEHandshake(connection, clientVersion, securityService);
+        validHandShake = readGFEHandshake(connection, clientVersion, securityService, acceptorImpl);
       } else {
         connection.refuseHandshake(
             "Unsupported version " + clientVersion + "Server's current version " + Acceptor.VERSION,
@@ -200,7 +200,7 @@ public class ServerHandShakeProcessor {
   }
 
   private static boolean readGFEHandshake(ServerConnection connection, Version clientVersion,
-      SecurityService securityService) {
+      SecurityService securityService, AcceptorImpl acceptorImpl) {
     int handShakeTimeout = connection.getHandShakeTimeout();
     InternalLogWriter securityLogWriter = connection.getSecurityLogWriter();
     try {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index dbda3d7..b7f0e7b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -20,6 +20,7 @@ package org.apache.geode.internal.cache.tier.sockets;
 import static org.apache.geode.internal.i18n.LocalizedStrings.HandShake_NO_SECURITY_CREDENTIALS_ARE_PROVIDED;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
@@ -29,6 +30,10 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.junit.Before;
 import org.junit.Rule;
@@ -38,10 +43,16 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.i18n.StringId;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.AuthenticationRequiredException;
 import org.apache.geode.test.junit.categories.UnitTest;
@@ -70,21 +81,29 @@ public class ServerConnectionTest {
   @InjectMocks
   private ServerConnection serverConnection;
 
+  private AcceptorImpl acceptor;
+  private Socket socket;
+  private InternalCache cache;
+  private SecurityService securityService;
+  private CacheServerStats stats;
+
   @Before
   public void setUp() throws IOException {
-    AcceptorImpl acceptor = mock(AcceptorImpl.class);
+    acceptor = mock(AcceptorImpl.class);
 
     InetAddress inetAddress = mock(InetAddress.class);
     when(inetAddress.getHostAddress()).thenReturn("localhost");
 
-    Socket socket = mock(Socket.class);
+    socket = mock(Socket.class);
     when(socket.getInetAddress()).thenReturn(inetAddress);
 
-    InternalCache cache = mock(InternalCache.class);
-    SecurityService securityService = mock(SecurityService.class);
+    cache = mock(InternalCache.class);
+    securityService = mock(SecurityService.class);
+
+    stats = mock(CacheServerStats.class);
 
     serverConnection =
-        new ServerConnectionFactory().makeServerConnection(socket, cache, null, null, 0,
0, null,
+        new ServerConnectionFactory().makeServerConnection(socket, cache, null, stats, 0,
0, null,
             CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
     MockitoAnnotations.initMocks(this);
   }
@@ -139,4 +158,114 @@ public class ServerConnectionTest {
         .hasMessage(HandShake_NO_SECURITY_CREDENTIALS_ARE_PROVIDED.getRawText());
   }
 
+  class TestMessage extends Message {
+    private final Lock lock = new ReentrantLock();
+    private final Condition testGate = lock.newCondition();
+    private boolean signalled = false;
+
+    public TestMessage() {
+      super(3, Version.CURRENT);
+      messageType = MessageType.REQUEST;
+      securePart = new Part();
+    }
+
+    @Override
+    public void recv() throws IOException {
+      try {
+        lock.lock();
+        testGate.await(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } finally {
+        lock.unlock();
+        if (!signalled) {
+          fail("Message never received continueProcessing call");
+        }
+      }
+    }
+
+    public void continueProcessing() {
+      lock.lock();
+      testGate.signal();
+      signalled = true;
+      lock.unlock();
+    }
+  }
+
+  class TestServerConnection extends LegacyServerConnection {
+
+    private TestMessage testMessage;
+
+    /**
+     * Creates a new <code>ServerConnection</code> that processes messages received
from an edge
+     * client over a given <code>Socket</code>.
+     */
+    public TestServerConnection(Socket socket, InternalCache internalCache,
+        CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
+        String communicationModeStr, byte communicationMode, Acceptor acceptor,
+        SecurityService securityService) {
+      super(socket, internalCache, helper, stats, hsTimeout, socketBufferSize, communicationModeStr,
+          communicationMode, acceptor, securityService);
+
+      setClientDisconnectCleanly(); // Not clear where this is supposed to be set in the
timeout
+                                    // path
+    }
+
+    @Override
+    protected void doHandshake() {
+      ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class);
+      when(proxyID.getDistributedMember()).thenReturn(mock(InternalDistributedMember.class));
+      HandShake handShake = mock(HandShake.class);
+      when(handShake.getMembership()).thenReturn(proxyID);
+      when(handShake.getVersion()).thenReturn(Version.CURRENT);
+
+      setHandshake(handShake);
+      setProxyId(proxyID);
+
+      processHandShake();
+      initializeCommands();
+
+      setFakeRequest();
+
+      long fakeId = -1;
+      MessageIdExtractor extractor = mock(MessageIdExtractor.class);
+      when(extractor.getUniqueIdFromMessage(getRequestMessage(), handShake,
+          Connection.DEFAULT_CONNECTION_ID)).thenReturn(fakeId);
+      setMessageIdExtractor(extractor);
+    }
+
+    @Override
+    void handleTermination(boolean timedOut) {
+      super.handleTermination(timedOut);
+      testMessage.continueProcessing();
+    }
+
+    private void setFakeRequest() {
+      testMessage = new TestMessage();
+      setRequestMsg(testMessage);
+    }
+  }
+
+  /**
+   * This test sets up a TestConnection which will register with the ClientHealthMonitor
and then
+   * block waiting to receive a fake message. This message will arrive just after the health
monitor
+   * times out this connection and kills it. The test then makes sure that the connection
correctly
+   * handles the terminated state and exits.
+   */
+  @Test
+  public void terminatingConnectionHandlesNewRequestsGracefully() throws Exception {
+    when(cache.getCacheTransactionManager()).thenReturn(mock(TXManagerImpl.class));
+    ClientHealthMonitor.createInstance(cache, 100, mock(CacheClientNotifierStats.class));
+    ClientHealthMonitor clientHealthMonitor = ClientHealthMonitor.getInstance();
+    when(acceptor.getClientHealthMonitor()).thenReturn(clientHealthMonitor);
+    when(acceptor.getConnectionListener()).thenReturn(mock(ConnectionListener.class));
+    when(securityService.isIntegratedSecurity()).thenReturn(true);
+
+    TestServerConnection testServerConnection =
+        new TestServerConnection(socket, cache, mock(CachedRegionHelper.class), stats, 0,
0, null,
+            CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
+    MockitoAnnotations.initMocks(this);
+
+    testServerConnection.run();
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
wirebaron@apache.org.

Mime
View raw message