geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [03/17] geode git commit: Cleanup CacheClientNotifier
Date Mon, 22 May 2017 18:51:00 GMT
http://git-wip-us.apache.org/repos/asf/geode/blob/eeab2576/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index e79bfbd..4bd4970 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -93,7 +93,6 @@ import org.apache.geode.internal.cache.ClientRegionEventImpl;
 import org.apache.geode.internal.cache.ClientServerObserver;
 import org.apache.geode.internal.cache.ClientServerObserverHolder;
 import org.apache.geode.internal.cache.Conflatable;
-import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
@@ -120,29 +119,158 @@ import org.apache.geode.security.AuthenticationFailedException;
 import org.apache.geode.security.AuthenticationRequiredException;
 
 /**
- * Class <code>CacheClientNotifier</code> works on the server and manages client socket connections
- * to clients requesting notification of updates and notifies them when updates occur.
+ * Class {@code CacheClientNotifier} works on the server and manages client socket connections to
+ * clients requesting notification of updates and notifies them when updates occur.
  *
  * @since GemFire 3.2
  */
-@SuppressWarnings({"synthetic-access", "deprecation"})
 public class CacheClientNotifier {
   private static final Logger logger = LogService.getLogger();
 
   private static volatile CacheClientNotifier ccnSingleton;
 
   /**
-   * Factory method to construct a CacheClientNotifier <code>CacheClientNotifier</code> instance.
+   * The map of known {@code CacheClientProxy} instances. Maps ClientProxyMembershipID to
+   * CacheClientProxy. Note that the keys in this map are not updated when a durable client
+   * reconnects. To make sure you get the updated ClientProxyMembershipID use this map to lookup the
+   * CacheClientProxy and then call getProxyID on it.
+   * <p>
+   * NOTE: package-private to avoid synthetic accessor
+   */
+  final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ clientProxies =
+      new ConcurrentHashMap();
+
+  /**
+   * The map of {@code CacheClientProxy} instances which are getting initialized. Maps
+   * ClientProxyMembershipID to CacheClientProxy.
+   */
+  private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ initClientProxies =
+      new ConcurrentHashMap();
+
+  private final Set<ClientProxyMembershipID> timedOutDurableClientProxies = new HashSet<>();
+
+  /**
+   * The GemFire {@code InternalCache}. Note that since this is a singleton class you should not use
+   * a direct reference to cache in CacheClientNotifier code. Instead, you should always use
+   * {@code getCache()}
+   */
+  private InternalCache cache; // TODO: fix synchronization of cache
+
+  private InternalLogWriter logWriter;
+
+  /**
+   * The GemFire security {@code LogWriter}
+   */
+  private InternalLogWriter securityLogWriter;
+
+  /** the maximum number of messages that can be enqueued in a client-queue. */
+  private final int maximumMessageCount;
+
+  /**
+   * the time (in seconds) after which a message in the client queue will expire.
+   */
+  private final int messageTimeToLive;
+
+  /**
+   * A listener which receives notifications about queues that are added or removed
+   */
+  private final ConnectionListener connectionListener;
+
+  private final CacheServerStats acceptorStats;
+
+  /**
+   * haContainer can hold either the name of the client-messages-region (in case of eviction
+   * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In
+   * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value.
+   */
+  private volatile HAContainerWrapper haContainer;
+
+  /**
+   * The size of the server-to-client communication socket buffers. This can be modified using the
+   * BridgeServer.SOCKET_BUFFER_SIZE system property.
+   */
+  private static final int socketBufferSize =
+      Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768);
+
+  /**
+   * The statistics for this notifier
+   */
+  final CacheClientNotifierStats statistics; // TODO: pass statistics into CacheClientProxy then
+                                             // make private
+
+  /**
+   * The {@code InterestRegistrationListener} instances registered in this VM. This is used when
+   * modifying the set of listeners.
+   */
+  private final Set writableInterestRegistrationListeners = new CopyOnWriteArraySet();
+
+  /**
+   * The {@code InterestRegistrationListener} instances registered in this VM. This is used to
+   * provide a read-only {@code Set} of listeners.
+   */
+  private final Set readableInterestRegistrationListeners =
+      Collections.unmodifiableSet(this.writableInterestRegistrationListeners);
+
+  /**
+   * System property name for indicating how much frequently the "Queue full" message should be
+   * logged.
+   */
+  private static final String MAX_QUEUE_LOG_FREQUENCY =
+      DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit";
+
+  public static final long DEFAULT_LOG_FREQUENCY = 1000;
+
+  private static final String EVENT_ENQUEUE_WAIT_TIME_NAME =
+      DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME";
+
+  private static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100;
+
+  /**
+   * System property value denoting the time in milliseconds. Any thread putting an event into a
+   * subscription queue, which is full, will wait this much time for the queue to make space. It'll
+   * then enque the event possibly causing the queue to grow beyond its capacity/max-size. See
+   * #51400.
+   */
+  public static int eventEnqueueWaitTime; // TODO: encapsulate eventEnqueueWaitTime
+
+  /**
+   * The frequency of logging the "Queue full" message.
+   */
+  private long logFrequency = DEFAULT_LOG_FREQUENCY;
+
+  private final Map<String, DefaultQuery> compiledQueries = new ConcurrentHashMap<>();
+
+  private volatile boolean isCompiledQueryCleanupThreadStarted = false;
+
+  private final Object lockIsCompiledQueryCleanupThreadStarted = new Object();
+
+  private SystemTimer.SystemTimerTask clientPingTask; // TODO: fix synchronization of clientPingTask
+
+  private final SocketCloser socketCloser;
+
+  private static final long CLIENT_PING_TASK_PERIOD =
+      Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000);
+
+  /**
+   * package-private to avoid synthetic accessor
+   */
+  static final long CLIENT_PING_TASK_COUNTER =
+      Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3);
+
+  private final Set blackListedClients = new CopyOnWriteArraySet();
+
+  /**
+   * Factory method to construct a CacheClientNotifier {@code CacheClientNotifier} instance.
    *
-   * @param cache The GemFire <code>InternalCache</code>
-   * @return A <code>CacheClientNotifier</code> instance
+   * @param cache The GemFire {@code InternalCache}
+   * @return A {@code CacheClientNotifier} instance
    */
   public static synchronized CacheClientNotifier getInstance(InternalCache cache,
       CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive,
       ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) {
     if (ccnSingleton == null) {
       ccnSingleton = new CacheClientNotifier(cache, acceptorStats, maximumMessageCount,
-          messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver);
+          messageTimeToLive, listener, isGatewayReceiver);
     }
 
     if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -158,20 +286,72 @@ public class CacheClientNotifier {
   }
 
   /**
+   * @param cache The GemFire {@code InternalCache}
+   * @param listener a listener which should receive notifications abouts queues being added or
+   *        removed.
+   */
+  private CacheClientNotifier(InternalCache cache, CacheServerStats acceptorStats,
+      int maximumMessageCount, int messageTimeToLive, ConnectionListener listener,
+      boolean isGatewayReceiver) {
+    // Set the Cache
+    setCache(cache);
+    this.acceptorStats = acceptorStats;
+    // we only need one thread per client and wait 50ms for close
+    this.socketCloser = new SocketCloser(1, 50);
+
+    // Set the LogWriter
+    this.logWriter = (InternalLogWriter) cache.getLogger();
+
+    this.connectionListener = listener;
+
+    // Set the security LogWriter
+    this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger();
+
+    this.maximumMessageCount = maximumMessageCount;
+    this.messageTimeToLive = messageTimeToLive;
+
+    // Initialize the statistics
+    StatisticsFactory factory;
+    if (isGatewayReceiver) {
+      factory = new DummyStatisticsFactory();
+    } else {
+      factory = getCache().getDistributedSystem();
+    }
+    this.statistics = new CacheClientNotifierStats(factory);
+
+    try {
+      this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
+      if (this.logFrequency <= 0) {
+        this.logFrequency = DEFAULT_LOG_FREQUENCY;
+      }
+    } catch (Exception e) {
+      this.logFrequency = DEFAULT_LOG_FREQUENCY;
+    }
+
+    eventEnqueueWaitTime =
+        Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
+    if (eventEnqueueWaitTime < 0) {
+      eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
+    }
+
+    // Schedule task to periodically ping clients.
+    scheduleClientPingTask();
+  }
+
+  /**
    * Writes a given message to the output stream
    *
-   * @param dos the <code>DataOutputStream</code> to use for writing the message
+   * @param dos the {@code DataOutputStream} to use for writing the message
    * @param type a byte representing the message type
-   * @param p_msg the message to be written; can be null
+   * @param message the message to be written; can be null
    */
-  private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion)
+  private void writeMessage(DataOutputStream dos, byte type, String message, Version clientVersion)
       throws IOException {
-    writeMessage(dos, type, p_msg, clientVersion, (byte) 0x00, 0);
+    writeMessage(dos, type, message, clientVersion, (byte) 0x00, 0);
   }
 
-  private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion,
+  private void writeMessage(DataOutputStream dos, byte type, String message, Version clientVersion,
       byte epType, int qSize) throws IOException {
-    String msg = p_msg;
 
     // write the message type
     dos.writeByte(type);
@@ -181,6 +361,7 @@ public class CacheClientNotifier {
     // dummy qSize
     dos.writeInt(qSize);
 
+    String msg = message;
     if (msg == null) {
       msg = "";
     }
@@ -188,10 +369,10 @@ public class CacheClientNotifier {
     if (clientVersion != null && clientVersion.compareTo(Version.GFE_61) >= 0) {
       // get all the instantiators.
       Instantiator[] instantiators = InternalInstantiator.getInstantiators();
-      HashMap instantiatorMap = new HashMap();
+      Map instantiatorMap = new HashMap();
       if (instantiators != null && instantiators.length > 0) {
         for (Instantiator instantiator : instantiators) {
-          ArrayList instantiatorAttributes = new ArrayList();
+          List<String> instantiatorAttributes = new ArrayList<>();
           instantiatorAttributes.add(instantiator.getClass().toString().substring(6));
           instantiatorAttributes.add(instantiator.getInstantiatedClass().toString().substring(6));
           instantiatorMap.put(instantiator.getId(), instantiatorAttributes);
@@ -201,15 +382,14 @@ public class CacheClientNotifier {
 
       // get all the dataserializers.
       DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers();
-      HashMap<Integer, ArrayList<String>> dsToSupportedClasses =
-          new HashMap<Integer, ArrayList<String>>();
-      HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, String>();
+      Map<Integer, List<String>> dsToSupportedClasses = new HashMap<>();
+      Map<Integer, String> dataSerializersMap = new HashMap<>();
       if (dataSerializers != null && dataSerializers.length > 0) {
         for (DataSerializer dataSerializer : dataSerializers) {
           dataSerializersMap.put(dataSerializer.getId(),
               dataSerializer.getClass().toString().substring(6));
           if (clientVersion.compareTo(Version.GFE_6516) >= 0) {
-            ArrayList<String> supportedClassNames = new ArrayList<String>();
+            List<String> supportedClassNames = new ArrayList<>();
             for (Class clazz : dataSerializer.getSupportedClasses()) {
               supportedClassNames.add(clazz.getName());
             }
@@ -228,7 +408,7 @@ public class CacheClientNotifier {
   /**
    * Writes an exception message to the socket
    *
-   * @param dos the <code>DataOutputStream</code> to use for writing the message
+   * @param dos the {@code DataOutputStream} to use for writing the message
    * @param type a byte representing the exception type
    * @param ex the exception to be written; should not be null
    */
@@ -245,7 +425,7 @@ public class CacheClientNotifier {
   public void registerClient(Socket socket, boolean isPrimary, long acceptorId,
       boolean notifyBySubscription) throws IOException {
     // Since no remote ports were specified in the message, wait for them.
-    long startTime = this._statistics.startTime();
+    long startTime = this.statistics.startTime();
     DataInputStream dis = new DataInputStream(socket.getInputStream());
     DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
 
@@ -261,7 +441,7 @@ public class CacheClientNotifier {
       SocketAddress sa = socket.getRemoteSocketAddress();
       UnsupportedVersionException uve = e;
       if (sa != null) {
-        String sInfo = " Client: " + sa.toString() + ".";
+        String sInfo = " Client: " + sa + ".";
         uve = new UnsupportedVersionException(e.getMessage() + sInfo);
       }
       logger.warn(
@@ -272,8 +452,7 @@ public class CacheClientNotifier {
       return;
     }
 
-    // Read and ignore the reply code. This is used on the client to server
-    // handshake.
+    // Read and ignore the reply code. This is used on the client to server handshake.
     dis.readByte(); // replyCode
 
     if (Version.GFE_57.compareTo(clientVersion) <= 0) {
@@ -289,7 +468,7 @@ public class CacheClientNotifier {
     }
   }
 
-  protected void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
+  private void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
       boolean isPrimary, long startTime, Version clientVersion, long acceptorId,
       boolean notifyBySubscription) throws IOException {
     // Read the ports and throw them away. We no longer need them
@@ -299,9 +478,6 @@ public class CacheClientNotifier {
     }
     // Read the handshake identifier and convert it to a string member id
     ClientProxyMembershipID proxyID = null;
-    CacheClientProxy proxy;
-    AccessControl authzCallback = null;
-    byte clientConflation = HandShake.CONFLATION_DEFAULT;
     try {
       proxyID = ClientProxyMembershipID.readCanonicalized(dis);
       if (getBlacklistedClient().contains(proxyID)) {
@@ -309,13 +485,14 @@ public class CacheClientNotifier {
             new Exception("This client is blacklisted by server"), clientVersion);
         return;
       }
-      proxy = getClientProxy(proxyID);
+      CacheClientProxy proxy = getClientProxy(proxyID);
       DistributedMember member = proxyID.getDistributedMember();
 
-      DistributedSystem system = this.getCache().getDistributedSystem();
+      DistributedSystem system = getCache().getDistributedSystem();
       Properties sysProps = system.getProperties();
       String authenticator = sysProps.getProperty(SECURITY_CLIENT_AUTHENTICATOR);
 
+      byte clientConflation;
       if (clientVersion.compareTo(Version.GFE_603) >= 0) {
         byte[] overrides = HandShake.extractOverrides(new byte[] {(byte) dis.read()});
         clientConflation = overrides[0];
@@ -339,27 +516,23 @@ public class CacheClientNotifier {
 
       Properties credentials = HandShake.readCredentials(dis, dos, system);
       if (credentials != null && proxy != null) {
-        if (securityLogWriter.fineEnabled()) {
-          securityLogWriter
+        if (this.securityLogWriter.fineEnabled()) {
+          this.securityLogWriter
               .fine("CacheClientNotifier: verifying credentials for proxyID: " + proxyID);
         }
         Object subject = HandShake.verifyCredentials(authenticator, credentials,
             system.getSecurityProperties(), this.logWriter, this.securityLogWriter, member);
         if (subject instanceof Principal) {
           Principal principal = (Principal) subject;
-          if (securityLogWriter.fineEnabled()) {
-            securityLogWriter
+          if (this.securityLogWriter.fineEnabled()) {
+            this.securityLogWriter
                 .fine("CacheClientNotifier: successfully verified credentials for proxyID: "
                     + proxyID + " having principal: " + principal.getName());
           }
 
           String postAuthzFactoryName = sysProps.getProperty(SECURITY_CLIENT_ACCESSOR_PP);
-          if (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) {
-            if (principal == null) {
-              securityLogWriter.warning(
-                  LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1,
-                  new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID});
-            }
+          AccessControl authzCallback = null;
+          if (postAuthzFactoryName != null && !postAuthzFactoryName.isEmpty()) {
             Method authzMethod = ClassLoadUtil.methodFromName(postAuthzFactoryName);
             authzCallback = (AccessControl) authzMethod.invoke(null, (Object[]) null);
             authzCallback.init(principal, member, this.getCache());
@@ -374,13 +547,13 @@ public class CacheClientNotifier {
           LocalizedStrings.CacheClientNotifier_CLIENTPROXYMEMBERSHIPID_OBJECT_COULD_NOT_BE_CREATED_EXCEPTION_OCCURRED_WAS_0
               .toLocalizedString(e));
     } catch (AuthenticationRequiredException ex) {
-      securityLogWriter.warning(
+      this.securityLogWriter.warning(
           LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
           new Object[] {proxyID, ex});
       writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_REQUIRED, ex, clientVersion);
       return;
     } catch (AuthenticationFailedException ex) {
-      securityLogWriter.warning(
+      this.securityLogWriter.warning(
           LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
           new Object[] {proxyID, ex});
       writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED, ex, clientVersion);
@@ -389,11 +562,10 @@ public class CacheClientNotifier {
       logger.warn(LocalizedMessage.create(
           LocalizedStrings.CacheClientNotifier_0_REGISTERCLIENT_EXCEPTION_ENCOUNTERED_IN_REGISTRATION_1,
           new Object[] {this, e}), e);
-      IOException io = new IOException(
+      throw new IOException(
           LocalizedStrings.CacheClientNotifier_EXCEPTION_OCCURRED_WHILE_TRYING_TO_REGISTER_INTEREST_DUE_TO_0
-              .toLocalizedString(e.getMessage()));
-      io.initCause(e);
-      throw io;
+              .toLocalizedString(e.getMessage()),
+          e);
     } catch (Exception ex) {
       logger.warn(LocalizedMessage.create(
           LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
@@ -402,7 +574,7 @@ public class CacheClientNotifier {
       return;
     }
 
-    this._statistics.endClientRegistration(startTime);
+    this.statistics.endClientRegistration(startTime);
   }
 
   /**
@@ -410,14 +582,13 @@ public class CacheClientNotifier {
    *
    * @param socket The socket over which the server communicates with the client.
    * @param proxyId The distributed member id of the client being registered
-   * @param proxy The <code>CacheClientProxy</code> of the given <code>proxyId</code>
+   * @param proxy The {@code CacheClientProxy} of the given {@code proxyId}
    *
    * @return CacheClientProxy for the registered client
    */
   private CacheClientProxy registerClient(Socket socket, ClientProxyMembershipID proxyId,
       CacheClientProxy proxy, boolean isPrimary, byte clientConflation, Version clientVersion,
       long acceptorId, boolean notifyBySubscription) throws IOException, CacheException {
-    CacheClientProxy l_proxy = proxy;
 
     // Initialize the socket
     socket.setTcpNoDelay(true);
@@ -431,9 +602,6 @@ public class CacheClientNotifier {
     }
 
     // Determine whether the client is durable or not.
-    byte responseByte = Acceptor.SUCCESSFUL_SERVER_TO_CLIENT;
-    String unsuccessfulMsg = null;
-    boolean successful = true;
     boolean clientIsDurable = proxyId.isDurable();
     if (logger.isDebugEnabled()) {
       if (clientIsDurable) {
@@ -446,8 +614,11 @@ public class CacheClientNotifier {
 
     byte epType = 0x00;
     int qSize = 0;
+    byte responseByte = Acceptor.SUCCESSFUL_SERVER_TO_CLIENT;
+    String unsuccessfulMsg = null;
+    boolean successful = true;
     if (clientIsDurable) {
-      if (l_proxy == null) {
+      if (proxy == null) {
         if (isTimedOut(proxyId)) {
           qSize = PoolImpl.PRIMARY_QUEUE_TIMED_OUT;
         } else {
@@ -459,9 +630,9 @@ public class CacheClientNotifier {
               "CacheClientNotifier: No proxy exists for durable client with id {}. It must be created.",
               proxyId.getDurableId());
         }
-        l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
+        proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
             clientVersion, acceptorId, notifyBySubscription);
-        successful = this.initializeProxy(l_proxy);
+        successful = this.initializeProxy(proxy);
       } else {
         if (proxy.isPrimary()) {
           epType = (byte) 2;
@@ -470,27 +641,27 @@ public class CacheClientNotifier {
         }
         qSize = proxy.getQueueSize();
         // A proxy exists for this durable client. It must be reinitialized.
-        if (l_proxy.isPaused()) {
+        if (proxy.isPaused()) {
           if (CacheClientProxy.testHook != null) {
             CacheClientProxy.testHook.doTestHook("CLIENT_PRE_RECONNECT");
           }
-          if (l_proxy.lockDrain()) {
+          if (proxy.lockDrain()) {
             try {
               if (logger.isDebugEnabled()) {
                 logger.debug(
                     "CacheClientNotifier: A proxy exists for durable client with id {}. This proxy will be reinitialized: {}",
-                    proxyId.getDurableId(), l_proxy);
+                    proxyId.getDurableId(), proxy);
               }
-              this._statistics.incDurableReconnectionCount();
-              l_proxy.getProxyID().updateDurableTimeout(proxyId.getDurableTimeout());
-              l_proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation,
+              this.statistics.incDurableReconnectionCount();
+              proxy.getProxyID().updateDurableTimeout(proxyId.getDurableTimeout());
+              proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation,
                   clientVersion);
-              l_proxy.setMarkerEnqueued(true);
+              proxy.setMarkerEnqueued(true);
               if (CacheClientProxy.testHook != null) {
                 CacheClientProxy.testHook.doTestHook("CLIENT_RECONNECTED");
               }
             } finally {
-              l_proxy.unlockDrain();
+              proxy.unlockDrain();
             }
           } else {
             unsuccessfulMsg =
@@ -507,7 +678,7 @@ public class CacheClientNotifier {
           // client is already using this durable id.
           unsuccessfulMsg =
               LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_THE_REQUESTED_DURABLE_CLIENT_HAS_THE_SAME_IDENTIFIER__0__AS_AN_EXISTING_DURABLE_CLIENT__1__DUPLICATE_DURABLE_CLIENTS_ARE_NOT_ALLOWED
-                  .toLocalizedString(new Object[] {proxyId.getDurableId(), proxy});
+                  .toLocalizedString(proxyId.getDurableId(), proxy);
           logger.warn(unsuccessfulMsg);
           // Set the unsuccessful response byte.
           responseByte = HandShake.REPLY_EXCEPTION_DUPLICATE_DURABLE_CLIENT;
@@ -537,18 +708,18 @@ public class CacheClientNotifier {
 
       if (toCreateNewProxy) {
         // Create the new proxy for this non-durable client
-        l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
+        proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
             clientVersion, acceptorId, notifyBySubscription);
-        successful = this.initializeProxy(l_proxy);
+        successful = this.initializeProxy(proxy);
       }
     }
 
     if (!successful) {
-      l_proxy = null;
+      proxy = null;
       responseByte = HandShake.REPLY_REFUSED;
       unsuccessfulMsg =
           LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_A_PREVIOUS_CONNECTION_ATTEMPT_FROM_THIS_CLIENT_IS_STILL_BEING_PROCESSED__0
-              .toLocalizedString(new Object[] {proxyId});
+              .toLocalizedString(proxyId);
     }
 
     // Tell the client that the proxy has been registered using the response
@@ -562,10 +733,10 @@ public class CacheClientNotifier {
       // write the message type, message length and the error message (if any)
       writeMessage(dos, responseByte, unsuccessfulMsg, clientVersion, epType, qSize);
     } catch (IOException ioe) {// remove the added proxy if we get IOException.
-      if (l_proxy != null) {
-        boolean keepProxy = l_proxy.close(false, false); // do not check for queue, just close it
+      if (proxy != null) {
+        boolean keepProxy = proxy.close(false, false); // do not check for queue, just close it
         if (!keepProxy) {
-          removeClientProxy(l_proxy);
+          removeClientProxy(proxy);
         }
       }
       throw ioe;
@@ -580,41 +751,39 @@ public class CacheClientNotifier {
     // will ensure that the response byte is sent to the client before
     // the marker message. If the client is durable, the message processor
     // is not started until the clientReady message is received.
-    if (!clientIsDurable && l_proxy != null
-        && responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) {
+    if (!clientIsDurable && proxy != null && responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) {
       // The startOrResumeMessageDispatcher tests if the proxy is a primary.
       // If this is a secondary proxy, the dispatcher is not started.
       // The false parameter signifies that a marker message has not already been
       // processed. This will generate and send one.
-      l_proxy.startOrResumeMessageDispatcher(false);
+      proxy.startOrResumeMessageDispatcher(false);
     }
 
     if (responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) {
       if (logger.isDebugEnabled()) {
-        logger.debug("CacheClientNotifier: Successfully registered {}", l_proxy);
+        logger.debug("CacheClientNotifier: Successfully registered {}", proxy);
       }
     } else {
       logger.warn(LocalizedMessage.create(
           LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_UNSUCCESSFULLY_REGISTERED_CLIENT_WITH_IDENTIFIER__0,
           proxyId));
     }
-    return l_proxy;
+    return proxy;
   }
 
-  private boolean initializeProxy(CacheClientProxy l_proxy) throws IOException, CacheException {
-    boolean status = false;
-    if (!this.isProxyInInitializationMode(l_proxy)) {
+  private boolean initializeProxy(CacheClientProxy proxy) throws CacheException {
+    if (!this.isProxyInInitializationMode(proxy)) {
       if (logger.isDebugEnabled()) {
-        logger.debug("Initializing proxy: {}", l_proxy);
+        logger.debug("Initializing proxy: {}", proxy);
       }
       try {
         // Add client proxy to initialization list. This has to be done before
         // the queue is created so that events can be buffered here for delivery
         // to the queue once it's initialized (bug #41681 and others)
-        addClientInitProxy(l_proxy);
-        l_proxy.initializeMessageDispatcher();
+        addClientInitProxy(proxy);
+        proxy.initializeMessageDispatcher();
         // Initialization success. Add to client proxy list.
-        addClientProxy(l_proxy);
+        addClientProxy(proxy);
         return true;
       } catch (RegionExistsException ree) {
         if (logger.isDebugEnabled()) {
@@ -624,10 +793,10 @@ public class CacheClientNotifier {
         }
         // This will return false;
       } finally {
-        removeClientInitProxy(l_proxy);
+        removeClientInitProxy(proxy);
       }
     }
-    return status;
+    return false;
   }
 
   /**
@@ -670,9 +839,9 @@ public class CacheClientNotifier {
     boolean success = false;
     CacheClientProxy proxy = getClientProxy(proxyId);
     if (proxy != null) {
-      HARegionQueue harq = proxy.getHARegionQueue();
-      harq.addDispatchedMessage(new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID()),
-          eid.getSequenceID());
+      HARegionQueue haRegionQueue = proxy.getHARegionQueue();
+      haRegionQueue.addDispatchedMessage(
+          new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID()), eid.getSequenceID());
       success = true;
     }
     return success;
@@ -690,11 +859,6 @@ public class CacheClientNotifier {
     }
     CacheClientProxy proxy = getClientProxy(membershipID);
     if (proxy != null) {
-      // Close the port if the proxy represents the client and contains the
-      // port)
-      // // If so, remove the port from the client's remote ports
-      // proxy.removePort(clientPort);
-      // Set the keepalive flag
       proxy.setKeepAlive(keepAlive);
     }
   }
@@ -704,7 +868,7 @@ public class CacheClientNotifier {
    *
    * @param memberId Uniquely identifies the client
    */
-  public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
+  void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
     if (logger.isDebugEnabled()) {
       logger.debug("CacheClientNotifier: Unregistering all clients with member id: {}", memberId);
     }
@@ -769,14 +933,16 @@ public class CacheClientNotifier {
    * notify interested clients of the given cache event using the given update message. The event
    * should have routing information in it that determines which clients will receive the event.
    */
-  public static void notifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) {
+  public static void notifyClients(InternalCacheEvent event,
+      ClientUpdateMessage clientUpdateMessage) {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
-      instance.singletonNotifyClients(event, cmsg);
+      instance.singletonNotifyClients(event, clientUpdateMessage);
     }
   }
 
-  private void singletonNotifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) {
+  private void singletonNotifyClients(InternalCacheEvent event,
+      ClientUpdateMessage clientUpdateMessage) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     final boolean isTraceEnabled = logger.isTraceEnabled();
 
@@ -796,20 +962,20 @@ public class CacheClientNotifier {
       return;
     }
 
-    long startTime = this._statistics.startTime();
+    long startTime = this.statistics.startTime();
 
     ClientUpdateMessageImpl clientMessage;
-    if (cmsg == null) {
+    if (clientUpdateMessage == null) {
       clientMessage = constructClientMessage(event);
     } else {
-      clientMessage = (ClientUpdateMessageImpl) cmsg;
+      clientMessage = (ClientUpdateMessageImpl) clientUpdateMessage;
     }
     if (clientMessage == null) {
       return;
     }
 
     // Holds the clientIds to which filter message needs to be sent.
-    Set<ClientProxyMembershipID> filterClients = new HashSet();
+    Set<ClientProxyMembershipID> filterClients = new HashSet<>();
 
     // Add CQ info.
     if (filterInfo.getCQs() != null) {
@@ -868,7 +1034,7 @@ public class CacheClientNotifier {
       }
     }
 
-    Conflatable conflatable = null;
+    Conflatable conflatable;
 
     if (clientMessage instanceof ClientTombstoneMessage) {
       // bug #46832 - HAEventWrapper deserialization can't handle subclasses
@@ -893,7 +1059,7 @@ public class CacheClientNotifier {
 
     singletonRouteClientMessage(conflatable, filterClients);
 
-    this._statistics.endEvent(startTime);
+    this.statistics.endEvent(startTime);
 
     // Cleanup destroyed events in CQ result cache.
     // While maintaining the CQ results key caching. the destroy event
@@ -915,7 +1081,7 @@ public class CacheClientNotifier {
         String cqName = regionProfile.getRealCqID(cqID);
         if (cqName != null) {
           ServerCQ cq = regionProfile.getCq(cqName);
-          if (cq != null && e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY))) {
+          if (cq != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
             cq.removeFromCqResultKeys(entryEvent.getKey(), true);
           }
         }
@@ -930,28 +1096,15 @@ public class CacheClientNotifier {
   public static void routeClientMessage(Conflatable clientMessage) {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
-      instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet()); // ok
-                                                                                             // to
-                                                                                             // use
-                                                                                             // keySet
-                                                                                             // here
-                                                                                             // because
-                                                                                             // all
-                                                                                             // we
-                                                                                             // do
-                                                                                             // is
-                                                                                             // call
-                                                                                             // getClientProxy
-                                                                                             // with
-                                                                                             // these
-                                                                                             // keys
+      // ok to use keySet here because all we do is call getClientProxy with these keys
+      instance.singletonRouteClientMessage(clientMessage, instance.clientProxies.keySet());
     }
   }
 
   /**
    * this is for server side registration of client queue
    */
-  public static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
+  static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
       ClientProxyMembershipID clientProxyMembershipId) {
     CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
@@ -963,27 +1116,25 @@ public class CacheClientNotifier {
   private void singletonRouteClientMessage(Conflatable conflatable,
       Collection<ClientProxyMembershipID> filterClients) {
 
-    this._cache.getCancelCriterion().checkCancelInProgress(null); // bug #43942 - client notified
-                                                                  // but no p2p distribution
+    this.cache.getCancelCriterion().checkCancelInProgress(null);
 
     List<CacheClientProxy> deadProxies = null;
     for (ClientProxyMembershipID clientId : filterClients) {
-      CacheClientProxy proxy;
-      proxy = this.getClientProxy(clientId, true);
+      CacheClientProxy proxy = this.getClientProxy(clientId, true);
       if (proxy != null) {
         if (proxy.isAlive() || proxy.isPaused() || proxy.isConnected() || proxy.isDurable()) {
           proxy.deliverMessage(conflatable);
         } else {
           proxy.getStatistics().incMessagesFailedQueued();
           if (deadProxies == null) {
-            deadProxies = new ArrayList<CacheClientProxy>();
+            deadProxies = new ArrayList<>();
           }
           deadProxies.add(proxy);
         }
-        this.blackListSlowReciever(proxy);
+        this.blackListSlowReceiver(proxy);
       }
     }
-    checkAndRemoveFromClientMsgsRegion(conflatable);
+    checkAndRemoveFromClientMessagesRegion(conflatable);
     // Remove any dead clients from the clients to notify
     if (deadProxies != null) {
       closeDeadProxies(deadProxies, false);
@@ -994,7 +1145,7 @@ public class CacheClientNotifier {
    * processes the given collection of durable and non-durable client identifiers, returning a
    * collection of non-durable identifiers of clients connected to this VM
    */
-  public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
+  Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
     return getProxyIDs(mixedDurableAndNonDurableIDs, false);
   }
 
@@ -1003,52 +1154,44 @@ public class CacheClientNotifier {
    * collection of non-durable identifiers of clients connected to this VM. This version can check
    * for proxies in initialization as well as fully initialized proxies.
    */
-  public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
+  private Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
       boolean proxyInInitMode) {
-    Set<ClientProxyMembershipID> result = new HashSet();
+    Set<ClientProxyMembershipID> result = new HashSet<>();
     for (Object id : mixedDurableAndNonDurableIDs) {
       if (id instanceof String) {
         CacheClientProxy clientProxy = getClientProxy((String) id, true);
         if (clientProxy != null) {
           result.add(clientProxy.getProxyID());
         }
-        // else { we don't have a proxy for the given durable ID }
       } else {
         // try to canonicalize the ID.
         CacheClientProxy proxy = getClientProxy((ClientProxyMembershipID) id, true);
         if (proxy != null) {
-          // this._logger.info(LocalizedStrings.DEBUG, "BRUCE: found match for " + id + ": " +
-          // proxy.getProxyID());
           result.add(proxy.getProxyID());
-        } else {
-          // this._logger.info(LocalizedStrings.DEBUG, "BRUCE: did not find match for " + id);
-          // this was causing OOMEs in HARegion initial image processing because
-          // messages had routing for clients unknown to this server
-          // result.add((ClientProxyMembershipID)id);
         }
       }
     }
     return result;
   }
 
-  private void blackListSlowReciever(CacheClientProxy clientProxy) {
+  private void blackListSlowReceiver(CacheClientProxy clientProxy) {
     final CacheClientProxy proxy = clientProxy;
-    if ((proxy.getHARegionQueue() != null && proxy.getHARegionQueue().isClientSlowReciever())
-        && !blackListedClients.contains(proxy.getProxyID())) {
+    if (proxy.getHARegionQueue() != null && proxy.getHARegionQueue().isClientSlowReciever()
+        && !this.blackListedClients.contains(proxy.getProxyID())) {
       // log alert with client info.
       logger.warn(
           LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CLIENT_0_IS_A_SLOW_RECEIVER,
               new Object[] {proxy.getProxyID()}));
       addToBlacklistedClient(proxy.getProxyID());
-      InternalDistributedSystem ids =
-          (InternalDistributedSystem) this.getCache().getDistributedSystem();
-      final DM dm = ids.getDistributionManager();
+      InternalDistributedSystem system = getCache().getInternalDistributedSystem();
+      final DM dm = system.getDistributionManager();
+
       dm.getWaitingThreadPool().execute(new Runnable() {
+        @Override
         public void run() {
 
           CacheDistributionAdvisor advisor =
-              ((DistributedRegion) proxy.getHARegionQueue().getRegion())
-                  .getCacheDistributionAdvisor();
+              proxy.getHARegionQueue().getRegion().getCacheDistributionAdvisor();
           Set members = advisor.adviseCacheOp();
 
           // Send client blacklist message
@@ -1074,25 +1217,24 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Initializes a <code>ClientUpdateMessage</code> from an operation and event
+   * Initializes a {@code ClientUpdateMessage} from an operation and event
    *
    * @param operation The operation that occurred (e.g. AFTER_CREATE)
    * @param event The event containing the data to be updated
-   * @return a <code>ClientUpdateMessage</code>
+   * @return a {@code ClientUpdateMessage}
    */
-  private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event)
-      throws Exception {
+  private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event) {
     if (!supportsOperation(operation)) {
-      throw new Exception(
+      throw new UnsupportedOperationException(
           LocalizedStrings.CacheClientNotifier_THE_CACHE_CLIENT_NOTIFIER_DOES_NOT_SUPPORT_OPERATIONS_OF_TYPE_0
               .toLocalizedString(operation));
     }
-    // String regionName = event.getRegion().getFullPath();
+
     Object keyOfInterest = null;
     final EventID eventIdentifier;
     ClientProxyMembershipID membershipID = null;
     boolean isNetLoad = false;
-    Object callbackArgument = null;
+    Object callbackArgument;
     byte[] delta = null;
     VersionTag versionTag = null;
 
@@ -1129,19 +1271,19 @@ public class CacheClientNotifier {
     }
 
     if (isNetLoad) {
-      clientUpdateMsg.setIsNetLoad(isNetLoad);
+      clientUpdateMsg.setIsNetLoad(true);
     }
 
     return clientUpdateMsg;
   }
 
   /**
-   * Returns whether the <code>CacheClientNotifier</code> supports the input operation.
+   * Returns whether the {@code CacheClientNotifier} supports the input operation.
    *
    * @param operation The operation that occurred (e.g. AFTER_CREATE)
-   * @return whether the <code>CacheClientNotifier</code> supports the input operation
+   * @return whether the {@code CacheClientNotifier} supports the input operation
    */
-  protected boolean supportsOperation(EnumListenerEvent operation) {
+  private boolean supportsOperation(EnumListenerEvent operation) {
     return operation == EnumListenerEvent.AFTER_CREATE
         || operation == EnumListenerEvent.AFTER_UPDATE
         || operation == EnumListenerEvent.AFTER_DESTROY
@@ -1211,7 +1353,7 @@ public class CacheClientNotifier {
       int regionDataPolicy) {
     if (regionDataPolicy == 0) {
       if (!regionsWithEmptyDataPolicy.containsKey(regionName)) {
-        regionsWithEmptyDataPolicy.put(regionName, Integer.valueOf(0));
+        regionsWithEmptyDataPolicy.put(regionName, 0);
       }
     }
   }
@@ -1222,8 +1364,8 @@ public class CacheClientNotifier {
    * @param regionName The name of the region of interest
    * @param keyOfInterest The name of the key of interest
    * @param isClosing Whether the caller is closing
-   * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested
-   *        in this <code>Region</code> and key
+   * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in
+   *        this {@code Region} and key
    */
   public void unregisterClientInterest(String regionName, Object keyOfInterest, int interestType,
       boolean isClosing, ClientProxyMembershipID membershipID, boolean keepalive) {
@@ -1244,8 +1386,8 @@ public class CacheClientNotifier {
    *
    * @param regionName The name of the region of interest
    * @param keysOfInterest The list of keys of interest
-   * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested
-   *        in this <code>Region</code> and key
+   * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in
+   *        this {@code Region} and key
    */
   public void registerClientInterest(String regionName, List keysOfInterest,
       ClientProxyMembershipID membershipID, boolean isDurable, boolean sendUpdatesAsInvalidates,
@@ -1278,8 +1420,8 @@ public class CacheClientNotifier {
    * @param regionName The name of the region of interest
    * @param keysOfInterest The list of keys of interest
    * @param isClosing Whether the caller is closing
-   * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested
-   *        in this <code>Region</code> and key
+   * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in
+   *        this {@code Region} and key
    */
   public void unregisterClientInterest(String regionName, List keysOfInterest, boolean isClosing,
       ClientProxyMembershipID membershipID, boolean keepalive) {
@@ -1301,21 +1443,22 @@ public class CacheClientNotifier {
    * 
    * @since GemFire 5.7
    */
-  private void checkAndRemoveFromClientMsgsRegion(Conflatable conflatable) {
-    if (haContainer == null) {
+  private void checkAndRemoveFromClientMessagesRegion(Conflatable conflatable) {
+    if (this.haContainer == null) {
       return;
     }
+
     if (conflatable instanceof HAEventWrapper) {
       HAEventWrapper wrapper = (HAEventWrapper) conflatable;
       if (!wrapper.getIsRefFromHAContainer()) {
-        wrapper = (HAEventWrapper) haContainer.getKey(wrapper);
+        wrapper = (HAEventWrapper) this.haContainer.getKey(wrapper);
         if (wrapper != null && !wrapper.getPutInProgress()) {
           synchronized (wrapper) {
             if (wrapper.getReferenceCount() == 0L) {
               if (logger.isDebugEnabled()) {
                 logger.debug("Removing event from haContainer: {}", wrapper);
               }
-              haContainer.remove(wrapper);
+              this.haContainer.remove(wrapper);
             }
           }
         }
@@ -1328,7 +1471,7 @@ public class CacheClientNotifier {
             if (logger.isDebugEnabled()) {
               logger.debug("Removing event from haContainer: {}", wrapper);
             }
-            haContainer.remove(wrapper);
+            this.haContainer.remove(wrapper);
           }
         }
       }
@@ -1336,12 +1479,12 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Returns the <code>CacheClientProxy</code> associated to the membershipID *
+   * Returns the {@code CacheClientProxy} associated to the membershipID *
    *
-   * @return the <code>CacheClientProxy</code> associated to the membershipID
+   * @return the {@code CacheClientProxy} associated to the membershipID
    */
   public CacheClientProxy getClientProxy(ClientProxyMembershipID membershipID) {
-    return (CacheClientProxy) this._clientProxies.get(membershipID);
+    return (CacheClientProxy) this.clientProxies.get(membershipID);
   }
 
   /**
@@ -1352,25 +1495,25 @@ public class CacheClientNotifier {
       boolean proxyInInitMode) {
     CacheClientProxy proxy = getClientProxy(membershipID);
     if (proxyInInitMode && proxy == null) {
-      proxy = (CacheClientProxy) this._initClientProxies.get(membershipID);
+      proxy = (CacheClientProxy) this.initClientProxies.get(membershipID);
     }
     return proxy;
   }
 
   /**
-   * Returns the <code>CacheClientProxy</code> associated to the durableClientId
+   * Returns the {@code CacheClientProxy} associated to the durableClientId
    * 
-   * @return the <code>CacheClientProxy</code> associated to the durableClientId
+   * @return the {@code CacheClientProxy} associated to the durableClientId
    */
   public CacheClientProxy getClientProxy(String durableClientId) {
     return getClientProxy(durableClientId, false);
   }
 
   /**
-   * Returns the <code>CacheClientProxy</code> associated to the durableClientId. This version of
-   * the method can check for initializing proxies as well as fully initialized proxies.
+   * Returns the {@code CacheClientProxy} associated to the durableClientId. This version of the
+   * method can check for initializing proxies as well as fully initialized proxies.
    * 
-   * @return the <code>CacheClientProxy</code> associated to the durableClientId
+   * @return the {@code CacheClientProxy} associated to the durableClientId
    */
   public CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -1379,9 +1522,9 @@ public class CacheClientNotifier {
     if (isDebugEnabled) {
       logger.debug("CacheClientNotifier: Determining client for {}", durableClientId);
     }
+
     CacheClientProxy proxy = null;
-    for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
-      CacheClientProxy clientProxy = (CacheClientProxy) i.next();
+    for (CacheClientProxy clientProxy : getClientProxies()) {
       if (isTraceEnabled) {
         logger.trace("CacheClientNotifier: Checking client {}", clientProxy);
       }
@@ -1394,9 +1537,10 @@ public class CacheClientNotifier {
         break;
       }
     }
+
     if (proxy == null && proxyInInitMode) {
-      for (Iterator i = this._initClientProxies.values().iterator(); i.hasNext();) {
-        CacheClientProxy clientProxy = (CacheClientProxy) i.next();
+      for (Object clientProxyObject : this.initClientProxies.values()) {
+        CacheClientProxy clientProxy = (CacheClientProxy) clientProxyObject;
         if (isTraceEnabled) {
           logger.trace("CacheClientNotifier: Checking initializing client {}", clientProxy);
         }
@@ -1415,37 +1559,6 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Returns the <code>CacheClientProxySameDS</code> associated to the membershipID *
-   * 
-   * @return the <code>CacheClientProxy</code> associated to the same distributed system
-   */
-  public CacheClientProxy getClientProxySameDS(ClientProxyMembershipID membershipID) {
-    final boolean isDebugEnabled = logger.isDebugEnabled();
-    if (isDebugEnabled) {
-      logger.debug("{}::getClientProxySameDS(), Determining client for host {}", this,
-          membershipID);
-      logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}",
-          this, getClientProxies().size());
-    }
-    CacheClientProxy proxy = null;
-    for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
-      CacheClientProxy clientProxy = (CacheClientProxy) i.next();
-      if (isDebugEnabled) {
-        logger.debug("CacheClientNotifier: Checking client {}", clientProxy);
-      }
-      if (clientProxy.isSameDSMember(membershipID)) {
-        proxy = clientProxy;
-        if (isDebugEnabled) {
-          logger.debug("CacheClientNotifier: {} represents the client running on host {}", proxy,
-              membershipID);
-        }
-        break;
-      }
-    }
-    return proxy;
-  }
-
-  /**
    * It will remove the clients connected to the passed acceptorId. If its the only server, shuts
    * down this instance.
    */
@@ -1453,10 +1566,10 @@ public class CacheClientNotifier {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     if (isDebugEnabled) {
       logger.debug("At cache server shutdown time, the number of cache servers in the cache is {}",
-          this.getCache().getCacheServers().size());
+          getCache().getCacheServers().size());
     }
 
-    Iterator it = this._clientProxies.values().iterator();
+    Iterator it = this.clientProxies.values().iterator();
     // Close all the client proxies
     while (it.hasNext()) {
       CacheClientProxy proxy = (CacheClientProxy) it.next();
@@ -1478,27 +1591,27 @@ public class CacheClientNotifier {
 
     if (noActiveServer() && ccnSingleton != null) {
       ccnSingleton = null;
-      if (haContainer != null) {
-        haContainer.cleanUp();
+      if (this.haContainer != null) {
+        this.haContainer.cleanUp();
         if (isDebugEnabled) {
-          logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName());
+          logger.debug("haContainer ({}) is now cleaned up.", this.haContainer.getName());
         }
       }
       this.clearCompiledQueries();
-      blackListedClients.clear();
+      this.blackListedClients.clear();
 
       // cancel the ping task
       this.clientPingTask.cancel();
 
       // Close the statistics
-      this._statistics.close();
+      this.statistics.close();
 
       this.socketCloser.close();
     }
   }
 
   private boolean noActiveServer() {
-    for (CacheServer server : this.getCache().getCacheServers()) {
+    for (CacheServer server : getCache().getCacheServers()) {
       if (server.isRunning()) {
         return false;
       }
@@ -1507,41 +1620,40 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Adds a new <code>CacheClientProxy</code> to the list of known client proxies
+   * Adds a new {@code CacheClientProxy} to the list of known client proxies
    *
-   * @param proxy The <code>CacheClientProxy</code> to add
+   * @param proxy The {@code CacheClientProxy} to add
    */
-  protected void addClientProxy(CacheClientProxy proxy) throws IOException {
-    // this._logger.info(LocalizedStrings.DEBUG, "adding client proxy " + proxy);
+  void addClientProxy(CacheClientProxy proxy) {
     getCache(); // ensure cache reference is up to date so firstclient state is correct
-    this._clientProxies.put(proxy.getProxyID(), proxy);
+    this.clientProxies.put(proxy.getProxyID(), proxy);
     // Remove this proxy from the init proxy list.
     removeClientInitProxy(proxy);
-    this._connectionListener.queueAdded(proxy.getProxyID());
-    if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
+    this.connectionListener.queueAdded(proxy.getProxyID());
+    if (proxy.clientConflation != HandShake.CONFLATION_ON) {
       // Delta not supported with conflation ON
-      ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
+      ClientHealthMonitor clientHealthMonitor = ClientHealthMonitor.getInstance();
       /*
        * #41788 - If the client connection init starts while cache/member is shutting down,
        * ClientHealthMonitor.getInstance() might return null.
        */
-      if (chm != null) {
-        chm.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal());
+      if (clientHealthMonitor != null) {
+        clientHealthMonitor.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal());
       }
     }
     this.timedOutDurableClientProxies.remove(proxy.getProxyID());
   }
 
-  protected void addClientInitProxy(CacheClientProxy proxy) throws IOException {
-    this._initClientProxies.put(proxy.getProxyID(), proxy);
+  private void addClientInitProxy(CacheClientProxy proxy) {
+    this.initClientProxies.put(proxy.getProxyID(), proxy);
   }
 
-  protected void removeClientInitProxy(CacheClientProxy proxy) throws IOException {
-    this._initClientProxies.remove(proxy.getProxyID());
+  private void removeClientInitProxy(CacheClientProxy proxy) {
+    this.initClientProxies.remove(proxy.getProxyID());
   }
 
-  protected boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException {
-    return this._initClientProxies.containsKey(proxy.getProxyID());
+  private boolean isProxyInInitializationMode(CacheClientProxy proxy) {
+    return this.initClientProxies.containsKey(proxy.getProxyID());
   }
 
   /**
@@ -1552,8 +1664,7 @@ public class CacheClientNotifier {
    */
   public Set getActiveClients() {
     Set clients = new HashSet();
-    for (Iterator iter = getClientProxies().iterator(); iter.hasNext();) {
-      CacheClientProxy proxy = (CacheClientProxy) iter.next();
+    for (CacheClientProxy proxy : getClientProxies()) {
       if (proxy.hasRegisteredInterested()) {
         ClientProxyMembershipID proxyID = proxy.getProxyID();
         clients.add(proxyID);
@@ -1569,8 +1680,8 @@ public class CacheClientNotifier {
    */
   public Map getAllClients() {
     Map clients = new HashMap();
-    for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
-      CacheClientProxy proxy = (CacheClientProxy) iter.next();
+    for (final Object o : this.clientProxies.values()) {
+      CacheClientProxy proxy = (CacheClientProxy) o;
       ClientProxyMembershipID proxyID = proxy.getProxyID();
       clients.put(proxyID, new CacheClientStatus(proxyID));
     }
@@ -1586,8 +1697,8 @@ public class CacheClientNotifier {
    * @since GemFire 5.6
    */
   public boolean hasDurableClient(String durableId) {
-    for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
-      CacheClientProxy proxy = (CacheClientProxy) iter.next();
+    for (Object clientProxyObject : this.clientProxies.values()) {
+      CacheClientProxy proxy = (CacheClientProxy) clientProxyObject;
       ClientProxyMembershipID proxyID = proxy.getProxyID();
       if (durableId.equals(proxyID.getDurableId())) {
         return true;
@@ -1605,15 +1716,11 @@ public class CacheClientNotifier {
    * @since GemFire 5.6
    */
   public boolean hasPrimaryForDurableClient(String durableId) {
-    for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
-      CacheClientProxy proxy = (CacheClientProxy) iter.next();
+    for (Object clientProxyObject : this.clientProxies.values()) {
+      CacheClientProxy proxy = (CacheClientProxy) clientProxyObject;
       ClientProxyMembershipID proxyID = proxy.getProxyID();
       if (durableId.equals(proxyID.getDurableId())) {
-        if (proxy.isPrimary()) {
-          return true;
-        } else {
-          return false;
-        }
+        return proxy.isPrimary();
       }
     }
     return false;
@@ -1626,9 +1733,9 @@ public class CacheClientNotifier {
    */
   public Map getClientQueueSizes() {
     Map/* <ClientProxyMembershipID,Integer> */ queueSizes = new HashMap();
-    for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
-      CacheClientProxy proxy = (CacheClientProxy) iter.next();
-      queueSizes.put(proxy.getProxyID(), Integer.valueOf(proxy.getQueueSize()));
+    for (Object clientProxyObject : this.clientProxies.values()) {
+      CacheClientProxy proxy = (CacheClientProxy) clientProxyObject;
+      queueSizes.put(proxy.getProxyID(), proxy.getQueueSize());
     }
     return queueSizes;
   }
@@ -1645,25 +1752,20 @@ public class CacheClientNotifier {
   public boolean closeClientCq(String durableClientId, String clientCQName) throws CqException {
     CacheClientProxy proxy = getClientProxy(durableClientId);
     // close and drain
-    if (proxy != null) {
-      return proxy.closeClientCq(clientCQName);
-    }
-    return false;
+    return proxy != null && proxy.closeClientCq(clientCQName);
   }
 
   /**
-   * Removes an existing <code>CacheClientProxy</code> from the list of known client proxies
+   * Removes an existing {@code CacheClientProxy} from the list of known client proxies
    *
-   * @param proxy The <code>CacheClientProxy</code> to remove
+   * @param proxy The {@code CacheClientProxy} to remove
    */
-  protected void removeClientProxy(CacheClientProxy proxy) {
-    // this._logger.info(LocalizedStrings.DEBUG, "removing client proxy " + proxy, new
-    // Exception("stack trace"));
+  void removeClientProxy(CacheClientProxy proxy) {
     ClientProxyMembershipID client = proxy.getProxyID();
-    this._clientProxies.remove(client);
-    this._connectionListener.queueRemoved();
-    this.getCache().cleanupForClient(this, client);
-    if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
+    this.clientProxies.remove(client);
+    this.connectionListener.queueRemoved();
+    getCache().cleanupForClient(this, client);
+    if (proxy.clientConflation != HandShake.CONFLATION_ON) {
       ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
       if (chm != null) {
         chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal());
@@ -1675,18 +1777,18 @@ public class CacheClientNotifier {
     this.timedOutDurableClientProxies.add(client);
   }
 
-  public boolean isTimedOut(ClientProxyMembershipID client) {
+  private boolean isTimedOut(ClientProxyMembershipID client) {
     return this.timedOutDurableClientProxies.contains(client);
   }
 
   /**
-   * Returns an unmodifiable Collection of known <code>CacheClientProxy</code> instances. The
-   * collection is not static so its contents may change.
+   * Returns an unmodifiable Collection of known {@code CacheClientProxy} instances. The collection
+   * is not static so its contents may change.
    *
-   * @return the collection of known <code>CacheClientProxy</code> instances
+   * @return the collection of known {@code CacheClientProxy} instances
    */
   public Collection<CacheClientProxy> getClientProxies() {
-    return Collections.unmodifiableCollection(this._clientProxies.values());
+    return Collections.unmodifiableCollection(this.clientProxies.values());
   }
 
   private void closeAllClientCqs(CacheClientProxy proxy) {
@@ -1698,12 +1800,12 @@ public class CacheClientNotifier {
           logger.debug("CacheClientNotifier: Closing client CQs: {}", proxy);
         }
         cqService.closeClientCqs(proxy.getProxyID());
-      } catch (CqException e1) {
+      } catch (CqException e) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.CacheClientNotifier_UNABLE_TO_CLOSE_CQS_FOR_THE_CLIENT__0,
             proxy.getProxyID()));
         if (isDebugEnabled) {
-          e1.printStackTrace();
+          logger.debug(e);
         }
       }
     }
@@ -1732,16 +1834,17 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Close dead <code>CacheClientProxy</code> instances
+   * Close dead {@code CacheClientProxy} instances
    *
-   * @param deadProxies The list of <code>CacheClientProxy</code> instances to close
+   * @param deadProxies The list of {@code CacheClientProxy} instances to close
    */
   private void closeDeadProxies(List deadProxies, boolean stoppedNormally) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    for (Iterator i = deadProxies.iterator(); i.hasNext();) {
-      CacheClientProxy proxy = (CacheClientProxy) i.next();
-      if (isDebugEnabled)
+    for (Object deadProxy : deadProxies) {
+      CacheClientProxy proxy = (CacheClientProxy) deadProxy;
+      if (isDebugEnabled) {
         logger.debug("CacheClientNotifier: Closing dead client: {}", proxy);
+      }
 
       // Close the proxy
       boolean keepProxy = false;
@@ -1757,8 +1860,7 @@ public class CacheClientNotifier {
       if (keepProxy) {
         logger.info(LocalizedMessage.create(
             LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_KEEPING_PROXY_FOR_DURABLE_CLIENT_NAMED_0_FOR_1_SECONDS_2,
-            new Object[] {proxy.getDurableId(), Integer.valueOf(proxy.getDurableTimeout()),
-                proxy}));
+            new Object[] {proxy.getDurableId(), proxy.getDurableTimeout(), proxy}));
       } else {
         closeAllClientCqs(proxy);
         if (isDebugEnabled) {
@@ -1771,10 +1873,10 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Registers a new <code>InterestRegistrationListener</code> with the set of
-   * <code>InterestRegistrationListener</code>s.
+   * Registers a new {@code InterestRegistrationListener} with the set of
+   * {@code InterestRegistrationListener}s.
    * 
-   * @param listener The <code>InterestRegistrationListener</code> to register
+   * @param listener The {@code InterestRegistrationListener} to register
    * 
    * @since GemFire 5.8Beta
    */
@@ -1783,10 +1885,10 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Unregisters an existing <code>InterestRegistrationListener</code> from the set of
-   * <code>InterestRegistrationListener</code>s.
+   * Unregisters an existing {@code InterestRegistrationListener} from the set of
+   * {@code InterestRegistrationListener}s.
    * 
-   * @param listener The <code>InterestRegistrationListener</code> to unregister
+   * @param listener The {@code InterestRegistrationListener} to unregister
    * 
    * @since GemFire 5.8Beta
    */
@@ -1795,11 +1897,11 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Returns a read-only collection of <code>InterestRegistrationListener</code>s registered with
-   * this notifier.
+   * Returns a read-only collection of {@code InterestRegistrationListener}s registered with this
+   * notifier.
    * 
-   * @return a read-only collection of <code>InterestRegistrationListener</code>s registered with
-   *         this notifier
+   * @return a read-only collection of {@code InterestRegistrationListener}s registered with this
+   *         notifier
    * 
    * @since GemFire 5.8Beta
    */
@@ -1811,17 +1913,17 @@ public class CacheClientNotifier {
    * 
    * @since GemFire 5.8Beta
    */
-  protected boolean containsInterestRegistrationListeners() {
+  boolean containsInterestRegistrationListeners() {
     return !this.writableInterestRegistrationListeners.isEmpty();
   }
 
   /**
-   * 
    * @since GemFire 5.8Beta
    */
-  protected void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
-    for (Iterator i = this.writableInterestRegistrationListeners.iterator(); i.hasNext();) {
-      InterestRegistrationListener listener = (InterestRegistrationListener) i.next();
+  void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
+    for (Object writableInterestRegistrationListener : this.writableInterestRegistrationListeners) {
+      InterestRegistrationListener listener =
+          (InterestRegistrationListener) writableInterestRegistrationListener;
       if (event.isRegister()) {
         listener.afterRegisterInterest(event);
       } else {
@@ -1836,207 +1938,70 @@ public class CacheClientNotifier {
    * @return the statistics for the notifier
    */
   public CacheClientNotifierStats getStats() {
-    return this._statistics;
+    return this.statistics;
   }
 
   /**
-   * Returns this <code>CacheClientNotifier</code>'s <code>InternalCache</code>.
+   * Returns this {@code CacheClientNotifier}'s {@code InternalCache}.
    * 
-   * @return this <code>CacheClientNotifier</code>'s <code>InternalCache</code>
+   * @return this {@code CacheClientNotifier}'s {@code InternalCache}
    */
   protected InternalCache getCache() { // TODO:SYNC: looks wrong
-    if (this._cache != null && this._cache.isClosed()) {
+    if (this.cache != null && this.cache.isClosed()) {
       InternalCache cache = GemFireCacheImpl.getInstance();
       if (cache != null) {
-        this._cache = cache;
+        this.cache = cache;
         this.logWriter = cache.getInternalLogWriter();
         this.securityLogWriter = cache.getSecurityInternalLogWriter();
       }
     }
-    return this._cache;
+    return this.cache;
   }
 
   /**
-   * Returns this <code>CacheClientNotifier</code>'s maximum message count.
+   * Returns this {@code CacheClientNotifier}'s maximum message count.
    * 
-   * @return this <code>CacheClientNotifier</code>'s maximum message count
+   * @return this {@code CacheClientNotifier}'s maximum message count
    */
   protected int getMaximumMessageCount() {
     return this.maximumMessageCount;
   }
 
   /**
-   * Returns this <code>CacheClientNotifier</code>'s message time-to-live.
+   * Returns this {@code CacheClientNotifier}'s message time-to-live.
    * 
-   * @return this <code>CacheClientNotifier</code>'s message time-to-live
+   * @return this {@code CacheClientNotifier}'s message time-to-live
    */
   protected int getMessageTimeToLive() {
     return this.messageTimeToLive;
   }
 
-  protected void handleInterestEvent(InterestRegistrationEvent event) {
+  void handleInterestEvent(InterestRegistrationEvent event) {
     LocalRegion region = (LocalRegion) event.getRegion();
     region.handleInterestEvent(event);
   }
 
-  /**
-   * @param cache The GemFire <code>InternalCache</code>
-   * @param listener a listener which should receive notifications abouts queues being added or
-   *        removed.
-   */
-  private CacheClientNotifier(InternalCache cache, CacheServerStats acceptorStats,
-      int maximumMessageCount, int messageTimeToLive, ConnectionListener listener,
-      List overflowAttributesList, boolean isGatewayReceiver) {
-    // Set the Cache
-    setCache(cache);
-    this.acceptorStats = acceptorStats;
-    this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms
-                                                 // for close
-
-    // Set the LogWriter
-    this.logWriter = (InternalLogWriter) cache.getLogger();
-
-    this._connectionListener = listener;
-
-    // Set the security LogWriter
-    this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger();
-
-    this.maximumMessageCount = maximumMessageCount;
-    this.messageTimeToLive = messageTimeToLive;
-
-    // Initialize the statistics
-    StatisticsFactory factory;
-    if (isGatewayReceiver) {
-      factory = new DummyStatisticsFactory();
-    } else {
-      factory = this.getCache().getDistributedSystem();
-    }
-    this._statistics = new CacheClientNotifierStats(factory);
-
-    try {
-      this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
-      if (this.logFrequency <= 0) {
-        this.logFrequency = DEFAULT_LOG_FREQUENCY;
-      }
-    } catch (Exception e) {
-      this.logFrequency = DEFAULT_LOG_FREQUENCY;
-    }
-
-    eventEnqueueWaitTime =
-        Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
-    if (eventEnqueueWaitTime < 0) {
-      eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
-    }
-
-    // Schedule task to periodically ping clients.
-    scheduleClientPingTask();
-  }
-
-  /**
-   * this message is used to send interest registration to another server. Since interest
-   * registration performs a state-flush operation this message must not transmitted on an ordered
-   * socket
-   */
-  public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage
-      implements MessageWithReply {
-    ClientProxyMembershipID clientId;
-    ClientInterestMessageImpl clientMessage;
-    int processorId;
-
-    ServerInterestRegistrationMessage(ClientProxyMembershipID clientID,
-        ClientInterestMessageImpl msg) {
-      this.clientId = clientID;
-      this.clientMessage = msg;
-    }
-
-    public ServerInterestRegistrationMessage() {}
-
-    static void sendInterestChange(DM dm, ClientProxyMembershipID clientID,
-        ClientInterestMessageImpl msg) {
-      ServerInterestRegistrationMessage smsg = new ServerInterestRegistrationMessage(clientID, msg);
-      Set recipients = dm.getOtherDistributionManagerIds();
-      smsg.setRecipients(recipients);
-      ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients);
-      smsg.processorId = rp.getProcessorId();
-      dm.putOutgoing(smsg);
-      try {
-        rp.waitForReplies();
-      } catch (InterruptedException ie) {
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    @Override
-    protected void process(DistributionManager dm) {
-      // Get the proxy for the proxy id
-      try {
-        CacheClientNotifier ccn = CacheClientNotifier.getInstance();
-        if (ccn != null) {
-          CacheClientProxy proxy = ccn.getClientProxy(clientId);
-          // If this VM contains a proxy for the requested proxy id, forward the
-          // message on to the proxy for processing
-          if (proxy != null) {
-            proxy.processInterestMessage(this.clientMessage);
-          }
-        }
-      } finally {
-        ReplyMessage reply = new ReplyMessage();
-        reply.setProcessorId(this.processorId);
-        reply.setRecipient(getSender());
-        try {
-          dm.putOutgoing(reply);
-        } catch (CancelException e) {
-          // can't send a reply, so ignore the exception
-        }
-      }
-    }
-
-    public int getDSFID() {
-      return SERVER_INTEREST_REGISTRATION_MESSAGE;
-    }
-
-    @Override
-    public void toData(DataOutput out) throws IOException {
-      super.toData(out);
-      out.writeInt(this.processorId);
-      InternalDataSerializer.invokeToData(this.clientId, out);
-      InternalDataSerializer.invokeToData(this.clientMessage, out);
-    }
-
-    @Override
-    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-      super.fromData(in);
-      this.processorId = in.readInt();
-      this.clientId = new ClientProxyMembershipID();
-      InternalDataSerializer.invokeFromData(this.clientId, in);
-      this.clientMessage = new ClientInterestMessageImpl();
-      InternalDataSerializer.invokeFromData(this.clientMessage, in);
-    }
-  }
-
-  protected void deliverInterestChange(ClientProxyMembershipID proxyID,
-      ClientInterestMessageImpl message) {
-    DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem())
-        .getDistributionManager();
+  void deliverInterestChange(ClientProxyMembershipID proxyID, ClientInterestMessageImpl message) {
+    DM dm = getCache().getInternalDistributedSystem().getDistributionManager();
     ServerInterestRegistrationMessage.sendInterestChange(dm, proxyID, message);
   }
 
-  public CacheServerStats getAcceptorStats() {
+  CacheServerStats getAcceptorStats() {
     return this.acceptorStats;
   }
 
-  public SocketCloser getSocketCloser() {
+  SocketCloser getSocketCloser() {
     return this.socketCloser;
   }
 
   public void addCompiledQuery(DefaultQuery query) {
     if (this.compiledQueries.putIfAbsent(query.getQueryString(), query) == null) {
       // Added successfully.
-      this._statistics.incCompiledQueryCount(1);
+      this.statistics.incCompiledQueryCount(1);
       if (logger.isDebugEnabled()) {
         logger.debug(
             "Added compiled query into ccn.compliedQueries list. Query: {}. Total compiled queries: {}",
-            query.getQueryString(), this._statistics.getCompiledQueryCount());
+            query.getQueryString(), this.statistics.getCompiledQueryCount());
       }
       // Start the clearIdleCompiledQueries thread.
       startCompiledQueryCleanupThread();
@@ -2048,13 +2013,13 @@ public class CacheClientNotifier {
   }
 
   private void clearCompiledQueries() {
-    if (this.compiledQueries.size() > 0) {
-      this._statistics.incCompiledQueryCount(-(this.compiledQueries.size()));
+    if (!this.compiledQueries.isEmpty()) {
+      this.statistics.incCompiledQueryCount(-this.compiledQueries.size());
       this.compiledQueries.clear();
       if (logger.isDebugEnabled()) {
         logger.debug(
             "Removed all compiled queries from ccn.compliedQueries list. Total compiled queries: {}",
-            this._statistics.getCompiledQueryCount());
+            this.statistics.getCompiledQueryCount());
       }
     }
   }
@@ -2064,7 +2029,7 @@ public class CacheClientNotifier {
    * checks for the compiled queries that are not used and removes them.
    */
   private void startCompiledQueryCleanupThread() {
-    if (isCompiledQueryCleanupThreadStarted) {
+    if (this.isCompiledQueryCleanupThreadStarted) {
       return;
     }
 
@@ -2082,11 +2047,11 @@ public class CacheClientNotifier {
           } else {
             if (compiledQueries.remove(e.getKey()) != null) {
               // If successfully removed decrement the counter.
-              _statistics.incCompiledQueryCount(-1);
+              statistics.incCompiledQueryCount(-1);
               if (isDebugEnabled) {
                 logger.debug("Removed compiled query from ccn.compliedQueries list. Query: "
                     + q.getQueryString() + ". Total compiled queries are : "
-                    + _statistics.getCompiledQueryCount());
+                    + statistics.getCompiledQueryCount());
               }
             }
           }
@@ -2094,23 +2059,23 @@ public class CacheClientNotifier {
       }
     };
 
-    synchronized (lockIsCompiledQueryCleanupThreadStarted) {
-      if (!isCompiledQueryCleanupThreadStarted) {
+    synchronized (this.lockIsCompiledQueryCleanupThreadStarted) {
+      if (!this.isCompiledQueryCleanupThreadStarted) {
         long period = DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME > 0
             ? DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME : DefaultQuery.COMPILED_QUERY_CLEAR_TIME;
-        _cache.getCCPTimer().scheduleAtFixedRate(task, period, period);
+        this.cache.getCCPTimer().scheduleAtFixedRate(task, period, period);
       }
-      isCompiledQueryCleanupThreadStarted = true;
+      this.isCompiledQueryCleanupThreadStarted = true;
     }
   }
 
-  protected void scheduleClientPingTask() {
+  void scheduleClientPingTask() {
     this.clientPingTask = new SystemTimer.SystemTimerTask() {
 
       @Override
       public void run2() {
         // If there are no proxies, return
-        if (CacheClientNotifier.this._clientProxies.isEmpty()) {
+        if (clientProxies.isEmpty()) {
           return;
         }
 
@@ -2145,144 +2110,10 @@ public class CacheClientNotifier {
     if (logger.isDebugEnabled()) {
       logger.debug("Scheduling client ping task with period={} ms", CLIENT_PING_TASK_PERIOD);
     }
-    CacheClientNotifier.this._cache.getCCPTimer().scheduleAtFixedRate(this.clientPingTask,
+    CacheClientNotifier.this.cache.getCCPTimer().scheduleAtFixedRate(this.clientPingTask,
         CLIENT_PING_TASK_PERIOD, CLIENT_PING_TASK_PERIOD);
   }
 
-  /**
-   * A string representing all hosts used for delivery purposes.
-   */
-  protected static final String ALL_HOSTS = "ALL_HOSTS";
-
-  /**
-   * An int representing all ports used for delivery purposes.
-   */
-  protected static final int ALL_PORTS = -1;
-
-  /**
-   * The map of known <code>CacheClientProxy</code> instances. Maps ClientProxyMembershipID to
-   * CacheClientProxy. Note that the keys in this map are not updated when a durable client
-   * reconnects. To make sure you get the updated ClientProxyMembershipID use this map to lookup the
-   * CacheClientProxy and then call getProxyID on it.
-   */
-  private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ _clientProxies =
-      new ConcurrentHashMap();
-
-  /**
-   * The map of <code>CacheClientProxy</code> instances which are getting initialized. Maps
-   * ClientProxyMembershipID to CacheClientProxy.
-   */
-  private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ _initClientProxies =
-      new ConcurrentHashMap();
-
-  private final HashSet<ClientProxyMembershipID> timedOutDurableClientProxies =
-      new HashSet<ClientProxyMembershipID>();
-
-  /**
-   * The GemFire <code>InternalCache</code>. Note that since this is a singleton class you should
-   * not use a direct reference to _cache in CacheClientNotifier code. Instead, you should always
-   * use <code>getCache()</code>
-   */
-  private InternalCache _cache;
-
-  private InternalLogWriter logWriter;
-
-  /**
-   * The GemFire security <code>LogWriter</code>
-   */
-  private InternalLogWriter securityLogWriter;
-
-  /** the maximum number of messages that can be enqueued in a client-queue. */
-  private int maximumMessageCount;
-
-  /**
-   * the time (in seconds) after which a message in the client queue will expire.
-   */
-  private int messageTimeToLive;
-
-  /**
-   * A listener which receives notifications about queues that are added or removed
-   */
-  private ConnectionListener _connectionListener;
-
-  private CacheServerStats acceptorStats;
-
-  /**
-   * haContainer can hold either the name of the client-messages-region (in case of eviction
-   * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In
-   * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value.
-   */
-  private volatile HAContainerWrapper haContainer;
-
-  /**
-   * The size of the server-to-client communication socket buffers. This can be modified using the
-   * BridgeServer.SOCKET_BUFFER_SIZE system property.
-   */
-  static final private int socketBufferSize =
-      Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue();
-
-  /**
-   * The statistics for this notifier
-   */
-  protected final CacheClientNotifierStats _statistics;
-
-  /**
-   * The <code>InterestRegistrationListener</code> instances registered in this VM. This is used
-   * when modifying the set of listeners.
-   */
-  private final Set writableInterestRegistrationListeners = new CopyOnWriteArraySet();
-
-  /**
-   * The <code>InterestRegistrationListener</code> instances registered in this VM. This is used to
-   * provide a read-only <code>Set</code> of listeners.
-   */
-  private final Set readableInterestRegistrationListeners =
-      Collections.unmodifiableSet(writableInterestRegistrationListeners);
-
-  /**
-   * System property name for indicating how much frequently the "Queue full" message should be
-   * logged.
-   */
-  public static final String MAX_QUEUE_LOG_FREQUENCY =
-      DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit";
-
-  public static final long DEFAULT_LOG_FREQUENCY = 1000;
-
-  public static final String EVENT_ENQUEUE_WAIT_TIME_NAME =
-      DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME";
-
-  public static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100;
-
-  /**
-   * System property value denoting the time in milliseconds. Any thread putting an event into a
-   * subscription queue, which is full, will wait this much time for the queue to make space. It'll
-   * then enque the event possibly causing the queue to grow beyond its capacity/max-size. See
-   * #51400.
-   */
-  public static int eventEnqueueWaitTime;
-
-  /**
-   * The frequency of logging the "Queue full" message.
-   */
-  private long logFrequency = DEFAULT_LOG_FREQUENCY;
-
-  private final ConcurrentHashMap<String, DefaultQuery> compiledQueries =
-      new ConcurrentHashMap<String, DefaultQuery>();
-
-  private volatile boolean isCompiledQueryCleanupThreadStarted = false;
-
-  private final Object lockIsCompiledQueryCleanupThreadStarted = new Object();
-
-  private SystemTimer.SystemTimerTask clientPingTask;
-
-  private final SocketCloser socketCloser;
-
-  private static final long CLIENT_PING_TASK_PERIOD =
-      Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000);
-
-  private static final long CLIENT_PING_TASK_COUNTER =
-      Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3);
-
   public long getLogFrequency() {
     return this.logFrequency;
   }
@@ -2291,64 +2122,153 @@ public class CacheClientNotifier {
    * @return the haContainer
    */
   public Map getHaContainer() {
-    return haContainer;
+    return this.haContainer;
   }
 
-  public void initHaContainer(List overflowAttributesList) {
+  private void initHaContainer(List overflowAttributesList) {
     // lazily initialize haContainer in case this CCN instance was created by a gateway receiver
     if (overflowAttributesList != null
         && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList.get(0))) {
-      haContainer = new HAContainerRegion(_cache.getRegion(Region.SEPARATOR
-          + CacheServerImpl.clientMessagesRegion(_cache, (String) overflowAttributesList.get(0),
-              ((Integer) overflowAttributesList.get(1)).intValue(),
-              ((Integer) overflowAttributesList.get(2)).intValue(),
+      this.haContainer = new HAContainerRegion(this.cache.getRegion(Region.SEPARATOR
+          + CacheServerImpl.clientMessagesRegion(this.cache, (String) overflowAttributesList.get(0),
+              (Integer) overflowAttributesList.get(1), (Integer) overflowAttributesList.get(2),
               (String) overflowAttributesList.get(3), (Boolean) overflowAttributesList.get(4))));
     } else {
-      haContainer = new HAContainerMap(new ConcurrentHashMap());
+      this.haContainer = new HAContainerMap(new ConcurrentHashMap());
     }
-    assert haContainer != null;
+    assert this.haContainer != null;
 
     if (logger.isDebugEnabled()) {
-      logger.debug("ha container ({}) has been created.", haContainer.getName());
+      logger.debug("ha container ({}) has been created.", this.haContainer.getName());
     }
   }
 
-  private final Set blackListedClients = new CopyOnWriteArraySet();
-
-  public void addToBlacklistedClient(ClientProxyMembershipID proxyID) {
-    blackListedClients.add(proxyID);
+  void addToBlacklistedClient(ClientProxyMembershipID proxyID) {
+    this.blackListedClients.add(proxyID);
     // ensure that cache and distributed system state are current and open
-    this.getCache();
+    getCache();
     new ScheduledThreadPoolExecutor(1).schedule(new ExpireBlackListTask(proxyID), 120,
         TimeUnit.SECONDS);
   }
 
-  public Set getBlacklistedClient() {
-    return blackListedClients;
+  Set getBlacklistedClient() {
+    return this.blackListedClients;
   }
 
   /**
-   * @param _cache the _cache to set
+   * @param cache the cache to set
    */
-  private void setCache(InternalCache _cache) {
-    this._cache = _cache;
+  private void setCache(InternalCache cache) {
+    this.cache = cache;
   }
 
+  /**
+   * Non-static inner class ExpireBlackListTask
+   */
   private class ExpireBlackListTask extends PoolTask {
-    private ClientProxyMembershipID proxyID;
+    private final ClientProxyMembershipID proxyID;
 
-    public ExpireBlackListTask(ClientProxyMembershipID proxyID) {
+    ExpireBlackListTask(ClientProxyMembershipID proxyID) {
       this.proxyID = proxyID;
     }
 
     @Override
     public void run2() {
-      if (blackListedClients.remove(proxyID)) {
+      if (blackListedClients.remove(this.proxyID)) {
         if (logger.isDebugEnabled()) {
-          logger.debug("{} client is no longer blacklisted", proxyID);
+          logger.debug("{} client is no longer blacklisted", this.proxyID);
         }
       }
     }
   }
+
+  /**
+   * Static inner-class ServerInterestRegistrationMessage
+   * <p>
+   * this message is used to send interest registration to another server. Since interest
+   * registration performs a state-flush operation this message must not transmitted on an ordered
+   * socket
+   */
+  public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage
+      implements MessageWithReply {
+
+    ClientProxyMembershipID clientId;
+    ClientInterestMessageImpl clientMessage;
+    int processorId;
+
+    ServerInterestRegistrationMessage(ClientProxyMembershipID clientID,
+        ClientInterestMessageImpl msg) {
+      this.clientId = clientID;
+      this.clientMessage = msg;
+    }
+
+    public ServerInterestRegistrationMessage() {
+      // nothing
+    }
+
+    static void sendInterestChange(DM dm, ClientProxyMembershipID clientID,
+        ClientInterestMessageImpl msg) {
+      ServerInterestRegistrationMessage registrationMessage =
+          new ServerInterestRegistrationMessage(clientID, msg);
+      Set recipients = dm.getOtherDistributionManagerIds();
+      registrationMessage.setRecipients(recipients);
+      ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients);
+      registrationMessage.processorId = rp.getProcessorId();
+      dm.putOutgoing(registrationMessage);
+      try {
+        rp.waitForReplies();
+      } catch (InterruptedException ignore) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override
+    protected void process(DistributionManager dm) {
+      // Get the proxy for the proxy id
+      try {
+        CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+        if (clientNotifier != null) {
+          CacheClientProxy proxy = clientNotifier.getClientProxy(this.clientId);
+          // If this VM contains a proxy for the requested proxy id, forward the
+          // message on to the proxy f

<TRUNCATED>

Mime
View raw message