geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [1/2] geode git commit: Fix BlockingHARegionJUnitTest
Date Wed, 31 May 2017 16:58:14 GMT
Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-2632-18 283215f9f -> 9c18bcc3f


http://git-wip-us.apache.org/repos/asf/geode/blob/9c18bcc3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 4c19df2..1ad70a1 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -17,7 +17,9 @@ package org.apache.geode.internal.cache.tier.sockets;
 import static org.apache.geode.distributed.ConfigurationProperties.*;
 
 import java.io.BufferedOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -68,7 +70,12 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.MessageWithReply;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.ClassLoadUtil;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.statistics.DummyStatisticsFactory;
@@ -86,6 +93,7 @@ import org.apache.geode.internal.cache.ClientRegionEventImpl;
 import org.apache.geode.internal.cache.ClientServerObserver;
 import org.apache.geode.internal.cache.ClientServerObserverHolder;
 import org.apache.geode.internal.cache.Conflatable;
+import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
@@ -112,131 +120,29 @@ import org.apache.geode.security.AuthenticationFailedException;
 import org.apache.geode.security.AuthenticationRequiredException;
 
 /**
- * Class {@code CacheClientNotifier} works on the server and manages client socket connections to
- * clients requesting notification of updates and notifies them when updates occur.
+ * Class <code>CacheClientNotifier</code> works on the server and manages client socket connections
+ * to clients requesting notification of updates and notifies them when updates occur.
  *
  * @since GemFire 3.2
  */
+@SuppressWarnings({"synthetic-access", "deprecation"})
 public class CacheClientNotifier {
   private static final Logger logger = LogService.getLogger();
 
-  /**
-   * The size of the server-to-client communication socket buffers. This can be modified using the
-   * BridgeServer.SOCKET_BUFFER_SIZE system property.
-   */
-  private static final int socketBufferSize =
-      Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768);
-
-  private static final long CLIENT_PING_TASK_PERIOD =
-      Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000);
-
-  /**
-   * package-private to avoid synthetic accessor
-   */
-  static final long CLIENT_PING_TASK_COUNTER =
-      Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3);
-
   private static volatile CacheClientNotifier ccnSingleton;
 
   /**
-   * The map of known {@code CacheClientProxy} instances. Maps ClientProxyMembershipID to
-   * CacheClientProxy. Note that the keys in this map are not updated when a durable client
-   * reconnects. To make sure you get the updated ClientProxyMembershipID use this map to lookup the
-   * CacheClientProxy and then call getProxyID on it.
-   * <p>
-   * NOTE: package-private to avoid synthetic accessor
-   */
-  final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ clientProxies =
-      new ConcurrentHashMap();
-
-  /**
-   * The map of {@code CacheClientProxy} instances which are getting initialized. Maps
-   * ClientProxyMembershipID to CacheClientProxy.
-   */
-  private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ initClientProxies =
-      new ConcurrentHashMap();
-
-  private final Set<ClientProxyMembershipID> timedOutDurableClientProxies = new HashSet<>();
-
-  /** the maximum number of messages that can be enqueued in a client-queue. */
-  private final int maximumMessageCount;
-
-  /**
-   * the time (in seconds) after which a message in the client queue will expire.
-   */
-  private final int messageTimeToLive;
-
-  /**
-   * A listener which receives notifications about queues that are added or removed
-   */
-  private final ConnectionListener connectionListener;
-
-  private final CacheServerStats acceptorStats;
-
-  /**
-   * The statistics for this notifier
-   */
-  final CacheClientNotifierStats statistics; // TODO: pass statistics into CacheClientProxy
-
-  /**
-   * The {@code InterestRegistrationListener} instances registered in this VM. This is used when
-   * modifying the set of listeners.
-   */
-  private final Set writableInterestRegistrationListeners = new CopyOnWriteArraySet();
-
-  /**
-   * The {@code InterestRegistrationListener} instances registered in this VM. This is used to
-   * provide a read-only {@code Set} of listeners.
-   */
-  private final Set readableInterestRegistrationListeners =
-      Collections.unmodifiableSet(this.writableInterestRegistrationListeners);
-
-  private final Map<String, DefaultQuery> compiledQueries = new ConcurrentHashMap<>();
-
-  private final Object lockIsCompiledQueryCleanupThreadStarted = new Object();
-
-  private final SocketCloser socketCloser;
-
-  /** package-private to avoid synthetic accessor */
-  final Set blackListedClients = new CopyOnWriteArraySet();
-
-  /**
-   * haContainer can hold either the name of the client-messages-region (in case of eviction
-   * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In
-   * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value.
-   */
-  private volatile HAContainerWrapper haContainer;
-
-  private volatile boolean isCompiledQueryCleanupThreadStarted = false;
-
-  /**
-   * The GemFire {@code InternalCache}. Note that since this is a singleton class you should not use
-   * a direct reference to cache in CacheClientNotifier code. Instead, you should always use
-   * {@code getCache()}
-   */
-  private InternalCache cache; // TODO: fix synchronization of cache
-
-  private InternalLogWriter logWriter;
-
-  /**
-   * The GemFire security {@code LogWriter}
-   */
-  private InternalLogWriter securityLogWriter;
-
-  private SystemTimer.SystemTimerTask clientPingTask; // TODO: fix synchronization of clientPingTask
-
-  /**
-   * Factory method to construct a CacheClientNotifier {@code CacheClientNotifier} instance.
+   * Factory method to construct a CacheClientNotifier <code>CacheClientNotifier</code> instance.
    *
-   * @param cache The GemFire {@code InternalCache}
-   * @return A {@code CacheClientNotifier} instance
+   * @param cache The GemFire <code>InternalCache</code>
+   * @return A <code>CacheClientNotifier</code> instance
    */
   public static synchronized CacheClientNotifier getInstance(InternalCache cache,
       CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive,
       ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) {
     if (ccnSingleton == null) {
       ccnSingleton = new CacheClientNotifier(cache, acceptorStats, maximumMessageCount,
-          messageTimeToLive, listener, isGatewayReceiver);
+          messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver);
     }
 
     if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -252,57 +158,20 @@ public class CacheClientNotifier {
   }
 
   /**
-   * @param cache The GemFire {@code InternalCache}
-   * @param listener a listener which should receive notifications abouts queues being added or
-   *        removed.
-   */
-  private CacheClientNotifier(InternalCache cache, CacheServerStats acceptorStats,
-      int maximumMessageCount, int messageTimeToLive, ConnectionListener listener,
-      boolean isGatewayReceiver) {
-    // Set the Cache
-    setCache(cache);
-    this.acceptorStats = acceptorStats;
-    // we only need one thread per client and wait 50ms for close
-    this.socketCloser = new SocketCloser(1, 50);
-
-    // Set the LogWriter
-    this.logWriter = (InternalLogWriter) cache.getLogger();
-
-    this.connectionListener = listener;
-
-    // Set the security LogWriter
-    this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger();
-
-    this.maximumMessageCount = maximumMessageCount;
-    this.messageTimeToLive = messageTimeToLive;
-
-    // Initialize the statistics
-    StatisticsFactory factory;
-    if (isGatewayReceiver) {
-      factory = new DummyStatisticsFactory();
-    } else {
-      factory = getCache().getDistributedSystem();
-    }
-    this.statistics = new CacheClientNotifierStats(factory);
-
-    // Schedule task to periodically ping clients.
-    scheduleClientPingTask();
-  }
-
-  /**
    * Writes a given message to the output stream
    *
-   * @param dos the {@code DataOutputStream} to use for writing the message
+   * @param dos the <code>DataOutputStream</code> to use for writing the message
    * @param type a byte representing the message type
-   * @param message the message to be written; can be null
+   * @param p_msg the message to be written; can be null
    */
-  private void writeMessage(DataOutputStream dos, byte type, String message, Version clientVersion)
+  private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion)
       throws IOException {
-    writeMessage(dos, type, message, clientVersion, (byte) 0x00, 0);
+    writeMessage(dos, type, p_msg, clientVersion, (byte) 0x00, 0);
   }
 
-  private void writeMessage(DataOutputStream dos, byte type, String message, Version clientVersion,
+  private void writeMessage(DataOutputStream dos, byte type, String p_msg, Version clientVersion,
       byte epType, int qSize) throws IOException {
+    String msg = p_msg;
 
     // write the message type
     dos.writeByte(type);
@@ -312,7 +181,6 @@ public class CacheClientNotifier {
     // dummy qSize
     dos.writeInt(qSize);
 
-    String msg = message;
     if (msg == null) {
       msg = "";
     }
@@ -320,10 +188,10 @@ public class CacheClientNotifier {
     if (clientVersion != null && clientVersion.compareTo(Version.GFE_61) >= 0) {
       // get all the instantiators.
       Instantiator[] instantiators = InternalInstantiator.getInstantiators();
-      Map instantiatorMap = new HashMap();
+      HashMap instantiatorMap = new HashMap();
       if (instantiators != null && instantiators.length > 0) {
         for (Instantiator instantiator : instantiators) {
-          List<String> instantiatorAttributes = new ArrayList<>();
+          ArrayList instantiatorAttributes = new ArrayList();
           instantiatorAttributes.add(instantiator.getClass().toString().substring(6));
           instantiatorAttributes.add(instantiator.getInstantiatedClass().toString().substring(6));
           instantiatorMap.put(instantiator.getId(), instantiatorAttributes);
@@ -333,14 +201,15 @@ public class CacheClientNotifier {
 
       // get all the dataserializers.
       DataSerializer[] dataSerializers = InternalDataSerializer.getSerializers();
-      Map<Integer, List<String>> dsToSupportedClasses = new HashMap<>();
-      Map<Integer, String> dataSerializersMap = new HashMap<>();
+      HashMap<Integer, ArrayList<String>> dsToSupportedClasses =
+          new HashMap<Integer, ArrayList<String>>();
+      HashMap<Integer, String> dataSerializersMap = new HashMap<Integer, String>();
       if (dataSerializers != null && dataSerializers.length > 0) {
         for (DataSerializer dataSerializer : dataSerializers) {
           dataSerializersMap.put(dataSerializer.getId(),
               dataSerializer.getClass().toString().substring(6));
           if (clientVersion.compareTo(Version.GFE_6516) >= 0) {
-            List<String> supportedClassNames = new ArrayList<>();
+            ArrayList<String> supportedClassNames = new ArrayList<String>();
             for (Class clazz : dataSerializer.getSupportedClasses()) {
               supportedClassNames.add(clazz.getName());
             }
@@ -359,7 +228,7 @@ public class CacheClientNotifier {
   /**
    * Writes an exception message to the socket
    *
-   * @param dos the {@code DataOutputStream} to use for writing the message
+   * @param dos the <code>DataOutputStream</code> to use for writing the message
    * @param type a byte representing the exception type
    * @param ex the exception to be written; should not be null
    */
@@ -392,7 +261,7 @@ public class CacheClientNotifier {
       SocketAddress sa = socket.getRemoteSocketAddress();
       UnsupportedVersionException uve = e;
       if (sa != null) {
-        String sInfo = " Client: " + sa + ".";
+        String sInfo = " Client: " + sa.toString() + ".";
         uve = new UnsupportedVersionException(e.getMessage() + sInfo);
       }
       logger.warn(
@@ -403,7 +272,8 @@ public class CacheClientNotifier {
       return;
     }
 
-    // Read and ignore the reply code. This is used on the client to server handshake.
+    // Read and ignore the reply code. This is used on the client to server
+    // handshake.
     dis.readByte(); // replyCode
 
     if (Version.GFE_57.compareTo(clientVersion) <= 0) {
@@ -419,7 +289,7 @@ public class CacheClientNotifier {
     }
   }
 
-  private void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
+  protected void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket,
       boolean isPrimary, long startTime, Version clientVersion, long acceptorId,
       boolean notifyBySubscription) throws IOException {
     // Read the ports and throw them away. We no longer need them
@@ -429,6 +299,9 @@ public class CacheClientNotifier {
     }
     // Read the handshake identifier and convert it to a string member id
     ClientProxyMembershipID proxyID = null;
+    CacheClientProxy proxy;
+    AccessControl authzCallback = null;
+    byte clientConflation = HandShake.CONFLATION_DEFAULT;
     try {
       proxyID = ClientProxyMembershipID.readCanonicalized(dis);
       if (getBlacklistedClient().contains(proxyID)) {
@@ -436,14 +309,13 @@ public class CacheClientNotifier {
             new Exception("This client is blacklisted by server"), clientVersion);
         return;
       }
-      CacheClientProxy proxy = getClientProxy(proxyID);
+      proxy = getClientProxy(proxyID);
       DistributedMember member = proxyID.getDistributedMember();
 
-      DistributedSystem system = getCache().getDistributedSystem();
+      DistributedSystem system = this.getCache().getDistributedSystem();
       Properties sysProps = system.getProperties();
       String authenticator = sysProps.getProperty(SECURITY_CLIENT_AUTHENTICATOR);
 
-      byte clientConflation;
       if (clientVersion.compareTo(Version.GFE_603) >= 0) {
         byte[] overrides = HandShake.extractOverrides(new byte[] {(byte) dis.read()});
         clientConflation = overrides[0];
@@ -467,23 +339,27 @@ public class CacheClientNotifier {
 
       Properties credentials = HandShake.readCredentials(dis, dos, system);
       if (credentials != null && proxy != null) {
-        if (this.securityLogWriter.fineEnabled()) {
-          this.securityLogWriter
+        if (securityLogWriter.fineEnabled()) {
+          securityLogWriter
               .fine("CacheClientNotifier: verifying credentials for proxyID: " + proxyID);
         }
         Object subject = HandShake.verifyCredentials(authenticator, credentials,
             system.getSecurityProperties(), this.logWriter, this.securityLogWriter, member);
         if (subject instanceof Principal) {
           Principal principal = (Principal) subject;
-          if (this.securityLogWriter.fineEnabled()) {
-            this.securityLogWriter
+          if (securityLogWriter.fineEnabled()) {
+            securityLogWriter
                 .fine("CacheClientNotifier: successfully verified credentials for proxyID: "
                     + proxyID + " having principal: " + principal.getName());
           }
 
           String postAuthzFactoryName = sysProps.getProperty(SECURITY_CLIENT_ACCESSOR_PP);
-          AccessControl authzCallback = null;
-          if (postAuthzFactoryName != null && !postAuthzFactoryName.isEmpty()) {
+          if (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) {
+            if (principal == null) {
+              securityLogWriter.warning(
+                  LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1,
+                  new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID});
+            }
             Method authzMethod = ClassLoadUtil.methodFromName(postAuthzFactoryName);
             authzCallback = (AccessControl) authzMethod.invoke(null, (Object[]) null);
             authzCallback.init(principal, member, this.getCache());
@@ -498,13 +374,13 @@ public class CacheClientNotifier {
           LocalizedStrings.CacheClientNotifier_CLIENTPROXYMEMBERSHIPID_OBJECT_COULD_NOT_BE_CREATED_EXCEPTION_OCCURRED_WAS_0
               .toLocalizedString(e));
     } catch (AuthenticationRequiredException ex) {
-      this.securityLogWriter.warning(
+      securityLogWriter.warning(
           LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
           new Object[] {proxyID, ex});
       writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_REQUIRED, ex, clientVersion);
       return;
     } catch (AuthenticationFailedException ex) {
-      this.securityLogWriter.warning(
+      securityLogWriter.warning(
           LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
           new Object[] {proxyID, ex});
       writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED, ex, clientVersion);
@@ -513,10 +389,11 @@ public class CacheClientNotifier {
       logger.warn(LocalizedMessage.create(
           LocalizedStrings.CacheClientNotifier_0_REGISTERCLIENT_EXCEPTION_ENCOUNTERED_IN_REGISTRATION_1,
           new Object[] {this, e}), e);
-      throw new IOException(
+      IOException io = new IOException(
           LocalizedStrings.CacheClientNotifier_EXCEPTION_OCCURRED_WHILE_TRYING_TO_REGISTER_INTEREST_DUE_TO_0
-              .toLocalizedString(e.getMessage()),
-          e);
+              .toLocalizedString(e.getMessage()));
+      io.initCause(e);
+      throw io;
     } catch (Exception ex) {
       logger.warn(LocalizedMessage.create(
           LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1,
@@ -533,13 +410,14 @@ public class CacheClientNotifier {
    *
    * @param socket The socket over which the server communicates with the client.
    * @param proxyId The distributed member id of the client being registered
-   * @param proxy The {@code CacheClientProxy} of the given {@code proxyId}
+   * @param proxy The <code>CacheClientProxy</code> of the given <code>proxyId</code>
    *
    * @return CacheClientProxy for the registered client
    */
   private CacheClientProxy registerClient(Socket socket, ClientProxyMembershipID proxyId,
       CacheClientProxy proxy, boolean isPrimary, byte clientConflation, Version clientVersion,
       long acceptorId, boolean notifyBySubscription) throws IOException, CacheException {
+    CacheClientProxy l_proxy = proxy;
 
     // Initialize the socket
     socket.setTcpNoDelay(true);
@@ -553,6 +431,9 @@ public class CacheClientNotifier {
     }
 
     // Determine whether the client is durable or not.
+    byte responseByte = Acceptor.SUCCESSFUL_SERVER_TO_CLIENT;
+    String unsuccessfulMsg = null;
+    boolean successful = true;
     boolean clientIsDurable = proxyId.isDurable();
     if (logger.isDebugEnabled()) {
       if (clientIsDurable) {
@@ -565,11 +446,8 @@ public class CacheClientNotifier {
 
     byte epType = 0x00;
     int qSize = 0;
-    byte responseByte = Acceptor.SUCCESSFUL_SERVER_TO_CLIENT;
-    String unsuccessfulMsg = null;
-    boolean successful = true;
     if (clientIsDurable) {
-      if (proxy == null) {
+      if (l_proxy == null) {
         if (isTimedOut(proxyId)) {
           qSize = PoolImpl.PRIMARY_QUEUE_TIMED_OUT;
         } else {
@@ -581,9 +459,9 @@ public class CacheClientNotifier {
               "CacheClientNotifier: No proxy exists for durable client with id {}. It must be created.",
               proxyId.getDurableId());
         }
-        proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
+        l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
             clientVersion, acceptorId, notifyBySubscription);
-        successful = this.initializeProxy(proxy);
+        successful = this.initializeProxy(l_proxy);
       } else {
         if (proxy.isPrimary()) {
           epType = (byte) 2;
@@ -592,27 +470,27 @@ public class CacheClientNotifier {
         }
         qSize = proxy.getQueueSize();
         // A proxy exists for this durable client. It must be reinitialized.
-        if (proxy.isPaused()) {
+        if (l_proxy.isPaused()) {
           if (CacheClientProxy.testHook != null) {
             CacheClientProxy.testHook.doTestHook("CLIENT_PRE_RECONNECT");
           }
-          if (proxy.lockDrain()) {
+          if (l_proxy.lockDrain()) {
             try {
               if (logger.isDebugEnabled()) {
                 logger.debug(
                     "CacheClientNotifier: A proxy exists for durable client with id {}. This proxy will be reinitialized: {}",
-                    proxyId.getDurableId(), proxy);
+                    proxyId.getDurableId(), l_proxy);
               }
               this.statistics.incDurableReconnectionCount();
-              proxy.getProxyID().updateDurableTimeout(proxyId.getDurableTimeout());
-              proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation,
+              l_proxy.getProxyID().updateDurableTimeout(proxyId.getDurableTimeout());
+              l_proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation,
                   clientVersion);
-              proxy.setMarkerEnqueued(true);
+              l_proxy.setMarkerEnqueued(true);
               if (CacheClientProxy.testHook != null) {
                 CacheClientProxy.testHook.doTestHook("CLIENT_RECONNECTED");
               }
             } finally {
-              proxy.unlockDrain();
+              l_proxy.unlockDrain();
             }
           } else {
             unsuccessfulMsg =
@@ -629,7 +507,7 @@ public class CacheClientNotifier {
           // client is already using this durable id.
           unsuccessfulMsg =
               LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_THE_REQUESTED_DURABLE_CLIENT_HAS_THE_SAME_IDENTIFIER__0__AS_AN_EXISTING_DURABLE_CLIENT__1__DUPLICATE_DURABLE_CLIENTS_ARE_NOT_ALLOWED
-                  .toLocalizedString(proxyId.getDurableId(), proxy);
+                  .toLocalizedString(new Object[] {proxyId.getDurableId(), proxy});
           logger.warn(unsuccessfulMsg);
           // Set the unsuccessful response byte.
           responseByte = HandShake.REPLY_EXCEPTION_DUPLICATE_DURABLE_CLIENT;
@@ -659,18 +537,18 @@ public class CacheClientNotifier {
 
       if (toCreateNewProxy) {
         // Create the new proxy for this non-durable client
-        proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
+        l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation,
             clientVersion, acceptorId, notifyBySubscription);
-        successful = this.initializeProxy(proxy);
+        successful = this.initializeProxy(l_proxy);
       }
     }
 
     if (!successful) {
-      proxy = null;
+      l_proxy = null;
       responseByte = HandShake.REPLY_REFUSED;
       unsuccessfulMsg =
           LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_A_PREVIOUS_CONNECTION_ATTEMPT_FROM_THIS_CLIENT_IS_STILL_BEING_PROCESSED__0
-              .toLocalizedString(proxyId);
+              .toLocalizedString(new Object[] {proxyId});
     }
 
     // Tell the client that the proxy has been registered using the response
@@ -684,10 +562,10 @@ public class CacheClientNotifier {
       // write the message type, message length and the error message (if any)
       writeMessage(dos, responseByte, unsuccessfulMsg, clientVersion, epType, qSize);
     } catch (IOException ioe) {// remove the added proxy if we get IOException.
-      if (proxy != null) {
-        boolean keepProxy = proxy.close(false, false); // do not check for queue, just close it
+      if (l_proxy != null) {
+        boolean keepProxy = l_proxy.close(false, false); // do not check for queue, just close it
         if (!keepProxy) {
-          removeClientProxy(proxy);
+          removeClientProxy(l_proxy);
         }
       }
       throw ioe;
@@ -702,39 +580,41 @@ public class CacheClientNotifier {
     // will ensure that the response byte is sent to the client before
     // the marker message. If the client is durable, the message processor
     // is not started until the clientReady message is received.
-    if (!clientIsDurable && proxy != null && responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) {
+    if (!clientIsDurable && l_proxy != null
+        && responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) {
       // The startOrResumeMessageDispatcher tests if the proxy is a primary.
       // If this is a secondary proxy, the dispatcher is not started.
       // The false parameter signifies that a marker message has not already been
       // processed. This will generate and send one.
-      proxy.startOrResumeMessageDispatcher(false);
+      l_proxy.startOrResumeMessageDispatcher(false);
     }
 
     if (responseByte == Acceptor.SUCCESSFUL_SERVER_TO_CLIENT) {
       if (logger.isDebugEnabled()) {
-        logger.debug("CacheClientNotifier: Successfully registered {}", proxy);
+        logger.debug("CacheClientNotifier: Successfully registered {}", l_proxy);
       }
     } else {
       logger.warn(LocalizedMessage.create(
           LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_UNSUCCESSFULLY_REGISTERED_CLIENT_WITH_IDENTIFIER__0,
           proxyId));
     }
-    return proxy;
+    return l_proxy;
   }
 
-  private boolean initializeProxy(CacheClientProxy proxy) throws CacheException {
-    if (!this.isProxyInInitializationMode(proxy)) {
+  private boolean initializeProxy(CacheClientProxy l_proxy) throws IOException, CacheException {
+    boolean status = false;
+    if (!this.isProxyInInitializationMode(l_proxy)) {
       if (logger.isDebugEnabled()) {
-        logger.debug("Initializing proxy: {}", proxy);
+        logger.debug("Initializing proxy: {}", l_proxy);
       }
       try {
         // Add client proxy to initialization list. This has to be done before
         // the queue is created so that events can be buffered here for delivery
         // to the queue once it's initialized (bug #41681 and others)
-        addClientInitProxy(proxy);
-        proxy.initializeMessageDispatcher();
+        addClientInitProxy(l_proxy);
+        l_proxy.initializeMessageDispatcher();
         // Initialization success. Add to client proxy list.
-        addClientProxy(proxy);
+        addClientProxy(l_proxy);
         return true;
       } catch (RegionExistsException ree) {
         if (logger.isDebugEnabled()) {
@@ -744,10 +624,10 @@ public class CacheClientNotifier {
         }
         // This will return false;
       } finally {
-        removeClientInitProxy(proxy);
+        removeClientInitProxy(l_proxy);
       }
     }
-    return false;
+    return status;
   }
 
   /**
@@ -790,9 +670,9 @@ public class CacheClientNotifier {
     boolean success = false;
     CacheClientProxy proxy = getClientProxy(proxyId);
     if (proxy != null) {
-      HARegionQueue haRegionQueue = proxy.getHARegionQueue();
-      haRegionQueue.addDispatchedMessage(
-          new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID()), eid.getSequenceID());
+      HARegionQueue harq = proxy.getHARegionQueue();
+      harq.addDispatchedMessage(new ThreadIdentifier(eid.getMembershipID(), eid.getThreadID()),
+          eid.getSequenceID());
       success = true;
     }
     return success;
@@ -810,6 +690,11 @@ public class CacheClientNotifier {
     }
     CacheClientProxy proxy = getClientProxy(membershipID);
     if (proxy != null) {
+      // Close the port if the proxy represents the client and contains the
+      // port)
+      // // If so, remove the port from the client's remote ports
+      // proxy.removePort(clientPort);
+      // Set the keepalive flag
       proxy.setKeepAlive(keepAlive);
     }
   }
@@ -819,7 +704,7 @@ public class CacheClientNotifier {
    *
    * @param memberId Uniquely identifies the client
    */
-  void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
+  public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) {
     if (logger.isDebugEnabled()) {
       logger.debug("CacheClientNotifier: Unregistering all clients with member id: {}", memberId);
     }
@@ -874,7 +759,7 @@ public class CacheClientNotifier {
    * in it that determines which clients will receive the event.
    */
   public static void notifyClients(InternalCacheEvent event) {
-    CacheClientNotifier instance = getInstance();
+    CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
       instance.singletonNotifyClients(event, null);
     }
@@ -884,16 +769,14 @@ public class CacheClientNotifier {
    * notify interested clients of the given cache event using the given update message. The event
    * should have routing information in it that determines which clients will receive the event.
    */
-  public static void notifyClients(InternalCacheEvent event,
-      ClientUpdateMessage clientUpdateMessage) {
-    CacheClientNotifier instance = getInstance();
+  public static void notifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) {
+    CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
-      instance.singletonNotifyClients(event, clientUpdateMessage);
+      instance.singletonNotifyClients(event, cmsg);
     }
   }
 
-  private void singletonNotifyClients(InternalCacheEvent event,
-      ClientUpdateMessage clientUpdateMessage) {
+  private void singletonNotifyClients(InternalCacheEvent event, ClientUpdateMessage cmsg) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     final boolean isTraceEnabled = logger.isTraceEnabled();
 
@@ -916,17 +799,17 @@ public class CacheClientNotifier {
     long startTime = this.statistics.startTime();
 
     ClientUpdateMessageImpl clientMessage;
-    if (clientUpdateMessage == null) {
+    if (cmsg == null) {
       clientMessage = constructClientMessage(event);
     } else {
-      clientMessage = (ClientUpdateMessageImpl) clientUpdateMessage;
+      clientMessage = (ClientUpdateMessageImpl) cmsg;
     }
     if (clientMessage == null) {
       return;
     }
 
     // Holds the clientIds to which filter message needs to be sent.
-    Set<ClientProxyMembershipID> filterClients = new HashSet<>();
+    Set<ClientProxyMembershipID> filterClients = new HashSet();
 
     // Add CQ info.
     if (filterInfo.getCQs() != null) {
@@ -985,7 +868,7 @@ public class CacheClientNotifier {
       }
     }
 
-    Conflatable conflatable;
+    Conflatable conflatable = null;
 
     if (clientMessage instanceof ClientTombstoneMessage) {
       // bug #46832 - HAEventWrapper deserialization can't handle subclasses
@@ -1032,7 +915,7 @@ public class CacheClientNotifier {
         String cqName = regionProfile.getRealCqID(cqID);
         if (cqName != null) {
           ServerCQ cq = regionProfile.getCq(cqName);
-          if (cq != null && e.getValue().equals(MessageType.LOCAL_DESTROY)) {
+          if (cq != null && e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY))) {
             cq.removeFromCqResultKeys(entryEvent.getKey(), true);
           }
         }
@@ -1045,19 +928,32 @@ public class CacheClientNotifier {
    * interest established, or override the isClientInterested method to implement its own routing
    */
   public static void routeClientMessage(Conflatable clientMessage) {
-    CacheClientNotifier instance = getInstance();
+    CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
-      // ok to use keySet here because all we do is call getClientProxy with these keys
-      instance.singletonRouteClientMessage(clientMessage, instance.clientProxies.keySet());
+      instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet()); // ok
+                                                                                             // to
+                                                                                             // use
+                                                                                             // keySet
+                                                                                             // here
+                                                                                             // because
+                                                                                             // all
+                                                                                             // we
+                                                                                             // do
+                                                                                             // is
+                                                                                             // call
+                                                                                             // getClientProxy
+                                                                                             // with
+                                                                                             // these
+                                                                                             // keys
     }
   }
 
   /**
    * this is for server side registration of client queue
    */
-  static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
+  public static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
       ClientProxyMembershipID clientProxyMembershipId) {
-    CacheClientNotifier instance = getInstance();
+    CacheClientNotifier instance = ccnSingleton;
     if (instance != null) {
       instance.singletonRouteClientMessage(clientMessage,
           Collections.singleton(clientProxyMembershipId));
@@ -1067,25 +963,27 @@ public class CacheClientNotifier {
   private void singletonRouteClientMessage(Conflatable conflatable,
       Collection<ClientProxyMembershipID> filterClients) {
 
-    this.cache.getCancelCriterion().checkCancelInProgress(null);
+    this._cache.getCancelCriterion().checkCancelInProgress(null); // bug #43942 - client notified
+                                                                  // but no p2p distribution
 
     List<CacheClientProxy> deadProxies = null;
     for (ClientProxyMembershipID clientId : filterClients) {
-      CacheClientProxy proxy = this.getClientProxy(clientId, true);
+      CacheClientProxy proxy;
+      proxy = this.getClientProxy(clientId, true);
       if (proxy != null) {
         if (proxy.isAlive() || proxy.isPaused() || proxy.isConnected() || proxy.isDurable()) {
           proxy.deliverMessage(conflatable);
         } else {
           proxy.getStatistics().incMessagesFailedQueued();
           if (deadProxies == null) {
-            deadProxies = new ArrayList<>();
+            deadProxies = new ArrayList<CacheClientProxy>();
           }
           deadProxies.add(proxy);
         }
-        this.blackListSlowReceiver(proxy);
+        this.blackListSlowReciever(proxy);
       }
     }
-    checkAndRemoveFromClientMessagesRegion(conflatable);
+    checkAndRemoveFromClientMsgsRegion(conflatable);
     // Remove any dead clients from the clients to notify
     if (deadProxies != null) {
       closeDeadProxies(deadProxies, false);
@@ -1096,7 +994,7 @@ public class CacheClientNotifier {
    * processes the given collection of durable and non-durable client identifiers, returning a
    * collection of non-durable identifiers of clients connected to this VM
    */
-  Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
+  public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) {
     return getProxyIDs(mixedDurableAndNonDurableIDs, false);
   }
 
@@ -1105,44 +1003,52 @@ public class CacheClientNotifier {
    * collection of non-durable identifiers of clients connected to this VM. This version can check
    * for proxies in initialization as well as fully initialized proxies.
    */
-  private Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
+  public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs,
       boolean proxyInInitMode) {
-    Set<ClientProxyMembershipID> result = new HashSet<>();
+    Set<ClientProxyMembershipID> result = new HashSet();
     for (Object id : mixedDurableAndNonDurableIDs) {
       if (id instanceof String) {
         CacheClientProxy clientProxy = getClientProxy((String) id, true);
         if (clientProxy != null) {
           result.add(clientProxy.getProxyID());
         }
+        // else { we don't have a proxy for the given durable ID }
       } else {
         // try to canonicalize the ID.
         CacheClientProxy proxy = getClientProxy((ClientProxyMembershipID) id, true);
         if (proxy != null) {
+          // this._logger.info(LocalizedStrings.DEBUG, "BRUCE: found match for " + id + ": " +
+          // proxy.getProxyID());
           result.add(proxy.getProxyID());
+        } else {
+          // this._logger.info(LocalizedStrings.DEBUG, "BRUCE: did not find match for " + id);
+          // this was causing OOMEs in HARegion initial image processing because
+          // messages had routing for clients unknown to this server
+          // result.add((ClientProxyMembershipID)id);
         }
       }
     }
     return result;
   }
 
-  private void blackListSlowReceiver(CacheClientProxy clientProxy) {
+  private void blackListSlowReciever(CacheClientProxy clientProxy) {
     final CacheClientProxy proxy = clientProxy;
-    if (proxy.getHARegionQueue() != null && proxy.getHARegionQueue().isClientSlowReciever()
-        && !this.blackListedClients.contains(proxy.getProxyID())) {
+    if ((proxy.getHARegionQueue() != null && proxy.getHARegionQueue().isClientSlowReciever())
+        && !blackListedClients.contains(proxy.getProxyID())) {
       // log alert with client info.
       logger.warn(
           LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CLIENT_0_IS_A_SLOW_RECEIVER,
               new Object[] {proxy.getProxyID()}));
       addToBlacklistedClient(proxy.getProxyID());
-      InternalDistributedSystem system = getCache().getInternalDistributedSystem();
-      final DM dm = system.getDistributionManager();
-
+      InternalDistributedSystem ids =
+          (InternalDistributedSystem) this.getCache().getDistributedSystem();
+      final DM dm = ids.getDistributionManager();
       dm.getWaitingThreadPool().execute(new Runnable() {
-        @Override
         public void run() {
 
           CacheDistributionAdvisor advisor =
-              proxy.getHARegionQueue().getRegion().getCacheDistributionAdvisor();
+              ((DistributedRegion) proxy.getHARegionQueue().getRegion())
+                  .getCacheDistributionAdvisor();
           Set members = advisor.adviseCacheOp();
 
           // Send client blacklist message
@@ -1168,24 +1074,25 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Initializes a {@code ClientUpdateMessage} from an operation and event
+   * Initializes a <code>ClientUpdateMessage</code> from an operation and event
    *
    * @param operation The operation that occurred (e.g. AFTER_CREATE)
    * @param event The event containing the data to be updated
-   * @return a {@code ClientUpdateMessage}
+   * @return a <code>ClientUpdateMessage</code>
    */
-  private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event) {
+  private ClientUpdateMessageImpl initializeMessage(EnumListenerEvent operation, CacheEvent event)
+      throws Exception {
     if (!supportsOperation(operation)) {
-      throw new UnsupportedOperationException(
+      throw new Exception(
           LocalizedStrings.CacheClientNotifier_THE_CACHE_CLIENT_NOTIFIER_DOES_NOT_SUPPORT_OPERATIONS_OF_TYPE_0
               .toLocalizedString(operation));
     }
-
+    // String regionName = event.getRegion().getFullPath();
     Object keyOfInterest = null;
     final EventID eventIdentifier;
     ClientProxyMembershipID membershipID = null;
     boolean isNetLoad = false;
-    Object callbackArgument;
+    Object callbackArgument = null;
     byte[] delta = null;
     VersionTag versionTag = null;
 
@@ -1222,19 +1129,19 @@ public class CacheClientNotifier {
     }
 
     if (isNetLoad) {
-      clientUpdateMsg.setIsNetLoad(true);
+      clientUpdateMsg.setIsNetLoad(isNetLoad);
     }
 
     return clientUpdateMsg;
   }
 
   /**
-   * Returns whether the {@code CacheClientNotifier} supports the input operation.
+   * Returns whether the <code>CacheClientNotifier</code> supports the input operation.
    *
    * @param operation The operation that occurred (e.g. AFTER_CREATE)
-   * @return whether the {@code CacheClientNotifier} supports the input operation
+   * @return whether the <code>CacheClientNotifier</code> supports the input operation
    */
-  private boolean supportsOperation(EnumListenerEvent operation) {
+  protected boolean supportsOperation(EnumListenerEvent operation) {
     return operation == EnumListenerEvent.AFTER_CREATE
         || operation == EnumListenerEvent.AFTER_UPDATE
         || operation == EnumListenerEvent.AFTER_DESTROY
@@ -1304,7 +1211,7 @@ public class CacheClientNotifier {
       int regionDataPolicy) {
     if (regionDataPolicy == 0) {
       if (!regionsWithEmptyDataPolicy.containsKey(regionName)) {
-        regionsWithEmptyDataPolicy.put(regionName, 0);
+        regionsWithEmptyDataPolicy.put(regionName, Integer.valueOf(0));
       }
     }
   }
@@ -1315,8 +1222,8 @@ public class CacheClientNotifier {
    * @param regionName The name of the region of interest
    * @param keyOfInterest The name of the key of interest
    * @param isClosing Whether the caller is closing
-   * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in
-   *        this {@code Region} and key
+   * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested
+   *        in this <code>Region</code> and key
    */
   public void unregisterClientInterest(String regionName, Object keyOfInterest, int interestType,
       boolean isClosing, ClientProxyMembershipID membershipID, boolean keepalive) {
@@ -1337,8 +1244,8 @@ public class CacheClientNotifier {
    *
    * @param regionName The name of the region of interest
    * @param keysOfInterest The list of keys of interest
-   * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in
-   *        this {@code Region} and key
+   * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested
+   *        in this <code>Region</code> and key
    */
   public void registerClientInterest(String regionName, List keysOfInterest,
       ClientProxyMembershipID membershipID, boolean isDurable, boolean sendUpdatesAsInvalidates,
@@ -1371,8 +1278,8 @@ public class CacheClientNotifier {
    * @param regionName The name of the region of interest
    * @param keysOfInterest The list of keys of interest
    * @param isClosing Whether the caller is closing
-   * @param membershipID The {@code ClientProxyMembershipID} of the client no longer interested in
-   *        this {@code Region} and key
+   * @param membershipID The <code>ClientProxyMembershipID</code> of the client no longer interested
+   *        in this <code>Region</code> and key
    */
   public void unregisterClientInterest(String regionName, List keysOfInterest, boolean isClosing,
       ClientProxyMembershipID membershipID, boolean keepalive) {
@@ -1394,22 +1301,21 @@ public class CacheClientNotifier {
    * 
    * @since GemFire 5.7
    */
-  private void checkAndRemoveFromClientMessagesRegion(Conflatable conflatable) {
-    if (this.haContainer == null) {
+  private void checkAndRemoveFromClientMsgsRegion(Conflatable conflatable) {
+    if (haContainer == null) {
       return;
     }
-
     if (conflatable instanceof HAEventWrapper) {
       HAEventWrapper wrapper = (HAEventWrapper) conflatable;
       if (!wrapper.getIsRefFromHAContainer()) {
-        wrapper = (HAEventWrapper) this.haContainer.getKey(wrapper);
+        wrapper = (HAEventWrapper) haContainer.getKey(wrapper);
         if (wrapper != null && !wrapper.getPutInProgress()) {
           synchronized (wrapper) {
             if (wrapper.getReferenceCount() == 0L) {
               if (logger.isDebugEnabled()) {
                 logger.debug("Removing event from haContainer: {}", wrapper);
               }
-              this.haContainer.remove(wrapper);
+              haContainer.remove(wrapper);
             }
           }
         }
@@ -1422,7 +1328,7 @@ public class CacheClientNotifier {
             if (logger.isDebugEnabled()) {
               logger.debug("Removing event from haContainer: {}", wrapper);
             }
-            this.haContainer.remove(wrapper);
+            haContainer.remove(wrapper);
           }
         }
       }
@@ -1430,12 +1336,12 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Returns the {@code CacheClientProxy} associated to the membershipID *
+   * Returns the <code>CacheClientProxy</code> associated to the membershipID *
    *
-   * @return the {@code CacheClientProxy} associated to the membershipID
+   * @return the <code>CacheClientProxy</code> associated to the membershipID
    */
   public CacheClientProxy getClientProxy(ClientProxyMembershipID membershipID) {
-    return (CacheClientProxy) this.clientProxies.get(membershipID);
+    return (CacheClientProxy) this._clientProxies.get(membershipID);
   }
 
   /**
@@ -1446,25 +1352,25 @@ public class CacheClientNotifier {
       boolean proxyInInitMode) {
     CacheClientProxy proxy = getClientProxy(membershipID);
     if (proxyInInitMode && proxy == null) {
-      proxy = (CacheClientProxy) this.initClientProxies.get(membershipID);
+      proxy = (CacheClientProxy) this._initClientProxies.get(membershipID);
     }
     return proxy;
   }
 
   /**
-   * Returns the {@code CacheClientProxy} associated to the durableClientId
+   * Returns the <code>CacheClientProxy</code> associated to the durableClientId
    * 
-   * @return the {@code CacheClientProxy} associated to the durableClientId
+   * @return the <code>CacheClientProxy</code> associated to the durableClientId
    */
   public CacheClientProxy getClientProxy(String durableClientId) {
     return getClientProxy(durableClientId, false);
   }
 
   /**
-   * Returns the {@code CacheClientProxy} associated to the durableClientId. This version of the
-   * method can check for initializing proxies as well as fully initialized proxies.
+   * Returns the <code>CacheClientProxy</code> associated to the durableClientId. This version of
+   * the method can check for initializing proxies as well as fully initialized proxies.
    * 
-   * @return the {@code CacheClientProxy} associated to the durableClientId
+   * @return the <code>CacheClientProxy</code> associated to the durableClientId
    */
   public CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -1473,9 +1379,9 @@ public class CacheClientNotifier {
     if (isDebugEnabled) {
       logger.debug("CacheClientNotifier: Determining client for {}", durableClientId);
     }
-
     CacheClientProxy proxy = null;
-    for (CacheClientProxy clientProxy : getClientProxies()) {
+    for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
+      CacheClientProxy clientProxy = (CacheClientProxy) i.next();
       if (isTraceEnabled) {
         logger.trace("CacheClientNotifier: Checking client {}", clientProxy);
       }
@@ -1488,10 +1394,9 @@ public class CacheClientNotifier {
         break;
       }
     }
-
     if (proxy == null && proxyInInitMode) {
-      for (Object clientProxyObject : this.initClientProxies.values()) {
-        CacheClientProxy clientProxy = (CacheClientProxy) clientProxyObject;
+      for (Iterator i = this._initClientProxies.values().iterator(); i.hasNext();) {
+        CacheClientProxy clientProxy = (CacheClientProxy) i.next();
         if (isTraceEnabled) {
           logger.trace("CacheClientNotifier: Checking initializing client {}", clientProxy);
         }
@@ -1510,6 +1415,37 @@ public class CacheClientNotifier {
   }
 
   /**
+   * Returns the <code>CacheClientProxySameDS</code> associated to the membershipID *
+   * 
+   * @return the <code>CacheClientProxy</code> associated to the same distributed system
+   */
+  public CacheClientProxy getClientProxySameDS(ClientProxyMembershipID membershipID) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (isDebugEnabled) {
+      logger.debug("{}::getClientProxySameDS(), Determining client for host {}", this,
+          membershipID);
+      logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}",
+          this, getClientProxies().size());
+    }
+    CacheClientProxy proxy = null;
+    for (Iterator i = getClientProxies().iterator(); i.hasNext();) {
+      CacheClientProxy clientProxy = (CacheClientProxy) i.next();
+      if (isDebugEnabled) {
+        logger.debug("CacheClientNotifier: Checking client {}", clientProxy);
+      }
+      if (clientProxy.isSameDSMember(membershipID)) {
+        proxy = clientProxy;
+        if (isDebugEnabled) {
+          logger.debug("CacheClientNotifier: {} represents the client running on host {}", proxy,
+              membershipID);
+        }
+        break;
+      }
+    }
+    return proxy;
+  }
+
+  /**
    * It will remove the clients connected to the passed acceptorId. If its the only server, shuts
    * down this instance.
    */
@@ -1517,10 +1453,10 @@ public class CacheClientNotifier {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     if (isDebugEnabled) {
       logger.debug("At cache server shutdown time, the number of cache servers in the cache is {}",
-          getCache().getCacheServers().size());
+          this.getCache().getCacheServers().size());
     }
 
-    Iterator it = this.clientProxies.values().iterator();
+    Iterator it = this._clientProxies.values().iterator();
     // Close all the client proxies
     while (it.hasNext()) {
       CacheClientProxy proxy = (CacheClientProxy) it.next();
@@ -1540,16 +1476,16 @@ public class CacheClientNotifier {
       }
     }
 
-    if (noActiveServer() && getInstance() != null) {
+    if (noActiveServer() && ccnSingleton != null) {
       ccnSingleton = null;
-      if (this.haContainer != null) {
-        this.haContainer.cleanUp();
+      if (haContainer != null) {
+        haContainer.cleanUp();
         if (isDebugEnabled) {
-          logger.debug("haContainer ({}) is now cleaned up.", this.haContainer.getName());
+          logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName());
         }
       }
       this.clearCompiledQueries();
-      this.blackListedClients.clear();
+      blackListedClients.clear();
 
       // cancel the ping task
       this.clientPingTask.cancel();
@@ -1562,7 +1498,7 @@ public class CacheClientNotifier {
   }
 
   private boolean noActiveServer() {
-    for (CacheServer server : getCache().getCacheServers()) {
+    for (CacheServer server : this.getCache().getCacheServers()) {
       if (server.isRunning()) {
         return false;
       }
@@ -1571,40 +1507,41 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Adds a new {@code CacheClientProxy} to the list of known client proxies
+   * Adds a new <code>CacheClientProxy</code> to the list of known client proxies
    *
-   * @param proxy The {@code CacheClientProxy} to add
+   * @param proxy The <code>CacheClientProxy</code> to add
    */
-  void addClientProxy(CacheClientProxy proxy) {
+  protected void addClientProxy(CacheClientProxy proxy) throws IOException {
+    // this._logger.info(LocalizedStrings.DEBUG, "adding client proxy " + proxy);
     getCache(); // ensure cache reference is up to date so firstclient state is correct
-    this.clientProxies.put(proxy.getProxyID(), proxy);
+    this._clientProxies.put(proxy.getProxyID(), proxy);
     // Remove this proxy from the init proxy list.
     removeClientInitProxy(proxy);
-    this.connectionListener.queueAdded(proxy.getProxyID());
-    if (proxy.clientConflation != HandShake.CONFLATION_ON) {
+    this._connectionListener.queueAdded(proxy.getProxyID());
+    if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
       // Delta not supported with conflation ON
-      ClientHealthMonitor clientHealthMonitor = ClientHealthMonitor.getInstance();
+      ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
       /*
        * #41788 - If the client connection init starts while cache/member is shutting down,
        * ClientHealthMonitor.getInstance() might return null.
        */
-      if (clientHealthMonitor != null) {
-        clientHealthMonitor.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal());
+      if (chm != null) {
+        chm.numOfClientsPerVersion.incrementAndGet(proxy.getVersion().ordinal());
       }
     }
     this.timedOutDurableClientProxies.remove(proxy.getProxyID());
   }
 
-  private void addClientInitProxy(CacheClientProxy proxy) {
-    this.initClientProxies.put(proxy.getProxyID(), proxy);
+  protected void addClientInitProxy(CacheClientProxy proxy) throws IOException {
+    this._initClientProxies.put(proxy.getProxyID(), proxy);
   }
 
-  private void removeClientInitProxy(CacheClientProxy proxy) {
-    this.initClientProxies.remove(proxy.getProxyID());
+  protected void removeClientInitProxy(CacheClientProxy proxy) throws IOException {
+    this._initClientProxies.remove(proxy.getProxyID());
   }
 
-  private boolean isProxyInInitializationMode(CacheClientProxy proxy) {
-    return this.initClientProxies.containsKey(proxy.getProxyID());
+  protected boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException {
+    return this._initClientProxies.containsKey(proxy.getProxyID());
   }
 
   /**
@@ -1615,7 +1552,8 @@ public class CacheClientNotifier {
    */
   public Set getActiveClients() {
     Set clients = new HashSet();
-    for (CacheClientProxy proxy : getClientProxies()) {
+    for (Iterator iter = getClientProxies().iterator(); iter.hasNext();) {
+      CacheClientProxy proxy = (CacheClientProxy) iter.next();
       if (proxy.hasRegisteredInterested()) {
         ClientProxyMembershipID proxyID = proxy.getProxyID();
         clients.add(proxyID);
@@ -1631,8 +1569,8 @@ public class CacheClientNotifier {
    */
   public Map getAllClients() {
     Map clients = new HashMap();
-    for (final Object o : this.clientProxies.values()) {
-      CacheClientProxy proxy = (CacheClientProxy) o;
+    for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
+      CacheClientProxy proxy = (CacheClientProxy) iter.next();
       ClientProxyMembershipID proxyID = proxy.getProxyID();
       clients.put(proxyID, new CacheClientStatus(proxyID));
     }
@@ -1648,8 +1586,8 @@ public class CacheClientNotifier {
    * @since GemFire 5.6
    */
   public boolean hasDurableClient(String durableId) {
-    for (Object clientProxyObject : this.clientProxies.values()) {
-      CacheClientProxy proxy = (CacheClientProxy) clientProxyObject;
+    for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
+      CacheClientProxy proxy = (CacheClientProxy) iter.next();
       ClientProxyMembershipID proxyID = proxy.getProxyID();
       if (durableId.equals(proxyID.getDurableId())) {
         return true;
@@ -1667,11 +1605,15 @@ public class CacheClientNotifier {
    * @since GemFire 5.6
    */
   public boolean hasPrimaryForDurableClient(String durableId) {
-    for (Object clientProxyObject : this.clientProxies.values()) {
-      CacheClientProxy proxy = (CacheClientProxy) clientProxyObject;
+    for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
+      CacheClientProxy proxy = (CacheClientProxy) iter.next();
       ClientProxyMembershipID proxyID = proxy.getProxyID();
       if (durableId.equals(proxyID.getDurableId())) {
-        return proxy.isPrimary();
+        if (proxy.isPrimary()) {
+          return true;
+        } else {
+          return false;
+        }
       }
     }
     return false;
@@ -1684,9 +1626,9 @@ public class CacheClientNotifier {
    */
   public Map getClientQueueSizes() {
     Map/* <ClientProxyMembershipID,Integer> */ queueSizes = new HashMap();
-    for (Object clientProxyObject : this.clientProxies.values()) {
-      CacheClientProxy proxy = (CacheClientProxy) clientProxyObject;
-      queueSizes.put(proxy.getProxyID(), proxy.getQueueSize());
+    for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) {
+      CacheClientProxy proxy = (CacheClientProxy) iter.next();
+      queueSizes.put(proxy.getProxyID(), Integer.valueOf(proxy.getQueueSize()));
     }
     return queueSizes;
   }
@@ -1703,20 +1645,25 @@ public class CacheClientNotifier {
   public boolean closeClientCq(String durableClientId, String clientCQName) throws CqException {
     CacheClientProxy proxy = getClientProxy(durableClientId);
     // close and drain
-    return proxy != null && proxy.closeClientCq(clientCQName);
+    if (proxy != null) {
+      return proxy.closeClientCq(clientCQName);
+    }
+    return false;
   }
 
   /**
-   * Removes an existing {@code CacheClientProxy} from the list of known client proxies
+   * Removes an existing <code>CacheClientProxy</code> from the list of known client proxies
    *
-   * @param proxy The {@code CacheClientProxy} to remove
+   * @param proxy The <code>CacheClientProxy</code> to remove
    */
-  void removeClientProxy(CacheClientProxy proxy) {
+  protected void removeClientProxy(CacheClientProxy proxy) {
+    // this._logger.info(LocalizedStrings.DEBUG, "removing client proxy " + proxy, new
+    // Exception("stack trace"));
     ClientProxyMembershipID client = proxy.getProxyID();
-    this.clientProxies.remove(client);
-    this.connectionListener.queueRemoved();
-    getCache().cleanupForClient(this, client);
-    if (proxy.clientConflation != HandShake.CONFLATION_ON) {
+    this._clientProxies.remove(client);
+    this._connectionListener.queueRemoved();
+    this.getCache().cleanupForClient(this, client);
+    if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) {
       ClientHealthMonitor chm = ClientHealthMonitor.getInstance();
       if (chm != null) {
         chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal());
@@ -1728,18 +1675,18 @@ public class CacheClientNotifier {
     this.timedOutDurableClientProxies.add(client);
   }
 
-  private boolean isTimedOut(ClientProxyMembershipID client) {
+  public boolean isTimedOut(ClientProxyMembershipID client) {
     return this.timedOutDurableClientProxies.contains(client);
   }
 
   /**
-   * Returns an unmodifiable Collection of known {@code CacheClientProxy} instances. The collection
-   * is not static so its contents may change.
+   * Returns an unmodifiable Collection of known <code>CacheClientProxy</code> instances. The
+   * collection is not static so its contents may change.
    *
-   * @return the collection of known {@code CacheClientProxy} instances
+   * @return the collection of known <code>CacheClientProxy</code> instances
    */
   public Collection<CacheClientProxy> getClientProxies() {
-    return Collections.unmodifiableCollection(this.clientProxies.values());
+    return Collections.unmodifiableCollection(this._clientProxies.values());
   }
 
   private void closeAllClientCqs(CacheClientProxy proxy) {
@@ -1751,12 +1698,12 @@ public class CacheClientNotifier {
           logger.debug("CacheClientNotifier: Closing client CQs: {}", proxy);
         }
         cqService.closeClientCqs(proxy.getProxyID());
-      } catch (CqException e) {
+      } catch (CqException e1) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.CacheClientNotifier_UNABLE_TO_CLOSE_CQS_FOR_THE_CLIENT__0,
             proxy.getProxyID()));
         if (isDebugEnabled) {
-          logger.debug(e);
+          e1.printStackTrace();
         }
       }
     }
@@ -1765,7 +1712,7 @@ public class CacheClientNotifier {
   /**
    * Shuts down durable client proxy
    */
-  public boolean closeDurableClientProxy(String durableClientId) {
+  public boolean closeDurableClientProxy(String durableClientId) throws CacheException {
     CacheClientProxy ccp = getClientProxy(durableClientId);
     if (ccp == null) {
       return false;
@@ -1779,22 +1726,22 @@ public class CacheClientNotifier {
       if (logger.isDebugEnabled()) {
         logger.debug("Cannot close running durable client: {}", durableClientId);
       }
-      throw new IllegalStateException("Cannot close a running durable client : " + durableClientId);
+      // TODO: never throw an anonymous inner class
+      throw new CacheException("Cannot close a running durable client : " + durableClientId) {};
     }
   }
 
   /**
-   * Close dead {@code CacheClientProxy} instances
+   * Close dead <code>CacheClientProxy</code> instances
    *
-   * @param deadProxies The list of {@code CacheClientProxy} instances to close
+   * @param deadProxies The list of <code>CacheClientProxy</code> instances to close
    */
   private void closeDeadProxies(List deadProxies, boolean stoppedNormally) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    for (Object deadProxy : deadProxies) {
-      CacheClientProxy proxy = (CacheClientProxy) deadProxy;
-      if (isDebugEnabled) {
+    for (Iterator i = deadProxies.iterator(); i.hasNext();) {
+      CacheClientProxy proxy = (CacheClientProxy) i.next();
+      if (isDebugEnabled)
         logger.debug("CacheClientNotifier: Closing dead client: {}", proxy);
-      }
 
       // Close the proxy
       boolean keepProxy = false;
@@ -1810,7 +1757,8 @@ public class CacheClientNotifier {
       if (keepProxy) {
         logger.info(LocalizedMessage.create(
             LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_KEEPING_PROXY_FOR_DURABLE_CLIENT_NAMED_0_FOR_1_SECONDS_2,
-            new Object[] {proxy.getDurableId(), proxy.getDurableTimeout(), proxy}));
+            new Object[] {proxy.getDurableId(), Integer.valueOf(proxy.getDurableTimeout()),
+                proxy}));
       } else {
         closeAllClientCqs(proxy);
         if (isDebugEnabled) {
@@ -1823,10 +1771,10 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Registers a new {@code InterestRegistrationListener} with the set of
-   * {@code InterestRegistrationListener}s.
+   * Registers a new <code>InterestRegistrationListener</code> with the set of
+   * <code>InterestRegistrationListener</code>s.
    * 
-   * @param listener The {@code InterestRegistrationListener} to register
+   * @param listener The <code>InterestRegistrationListener</code> to register
    * 
    * @since GemFire 5.8Beta
    */
@@ -1835,10 +1783,10 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Unregisters an existing {@code InterestRegistrationListener} from the set of
-   * {@code InterestRegistrationListener}s.
+   * Unregisters an existing <code>InterestRegistrationListener</code> from the set of
+   * <code>InterestRegistrationListener</code>s.
    * 
-   * @param listener The {@code InterestRegistrationListener} to unregister
+   * @param listener The <code>InterestRegistrationListener</code> to unregister
    * 
    * @since GemFire 5.8Beta
    */
@@ -1847,11 +1795,11 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Returns a read-only collection of {@code InterestRegistrationListener}s registered with this
-   * notifier.
+   * Returns a read-only collection of <code>InterestRegistrationListener</code>s registered with
+   * this notifier.
    * 
-   * @return a read-only collection of {@code InterestRegistrationListener}s registered with this
-   *         notifier
+   * @return a read-only collection of <code>InterestRegistrationListener</code>s registered with
+   *         this notifier
    * 
    * @since GemFire 5.8Beta
    */
@@ -1863,17 +1811,17 @@ public class CacheClientNotifier {
    * 
    * @since GemFire 5.8Beta
    */
-  boolean containsInterestRegistrationListeners() {
+  protected boolean containsInterestRegistrationListeners() {
     return !this.writableInterestRegistrationListeners.isEmpty();
   }
 
   /**
+   * 
    * @since GemFire 5.8Beta
    */
-  void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
-    for (Object writableInterestRegistrationListener : this.writableInterestRegistrationListeners) {
-      InterestRegistrationListener listener =
-          (InterestRegistrationListener) writableInterestRegistrationListener;
+  protected void notifyInterestRegistrationListeners(InterestRegistrationEvent event) {
+    for (Iterator i = this.writableInterestRegistrationListeners.iterator(); i.hasNext();) {
+      InterestRegistrationListener listener = (InterestRegistrationListener) i.next();
       if (event.isRegister()) {
         listener.afterRegisterInterest(event);
       } else {
@@ -1892,55 +1840,192 @@ public class CacheClientNotifier {
   }
 
   /**
-   * Returns this {@code CacheClientNotifier}'s {@code InternalCache}.
+   * Returns this <code>CacheClientNotifier</code>'s <code>InternalCache</code>.
    * 
-   * @return this {@code CacheClientNotifier}'s {@code InternalCache}
+   * @return this <code>CacheClientNotifier</code>'s <code>InternalCache</code>
    */
   protected InternalCache getCache() { // TODO:SYNC: looks wrong
-    if (this.cache != null && this.cache.isClosed()) {
+    if (this._cache != null && this._cache.isClosed()) {
       InternalCache cache = GemFireCacheImpl.getInstance();
       if (cache != null) {
-        this.cache = cache;
+        this._cache = cache;
         this.logWriter = cache.getInternalLogWriter();
         this.securityLogWriter = cache.getSecurityInternalLogWriter();
       }
     }
-    return this.cache;
+    return this._cache;
   }
 
   /**
-   * Returns this {@code CacheClientNotifier}'s maximum message count.
+   * Returns this <code>CacheClientNotifier</code>'s maximum message count.
    * 
-   * @return this {@code CacheClientNotifier}'s maximum message count
+   * @return this <code>CacheClientNotifier</code>'s maximum message count
    */
   protected int getMaximumMessageCount() {
     return this.maximumMessageCount;
   }
 
   /**
-   * Returns this {@code CacheClientNotifier}'s message time-to-live.
+   * Returns this <code>CacheClientNotifier</code>'s message time-to-live.
    * 
-   * @return this {@code CacheClientNotifier}'s message time-to-live
+   * @return this <code>CacheClientNotifier</code>'s message time-to-live
    */
   protected int getMessageTimeToLive() {
     return this.messageTimeToLive;
   }
 
-  void handleInterestEvent(InterestRegistrationEvent event) {
+  protected void handleInterestEvent(InterestRegistrationEvent event) {
     LocalRegion region = (LocalRegion) event.getRegion();
     region.handleInterestEvent(event);
   }
 
-  void deliverInterestChange(ClientProxyMembershipID proxyID, ClientInterestMessageImpl message) {
-    DM dm = getCache().getInternalDistributedSystem().getDistributionManager();
+  /**
+   * @param cache The GemFire <code>InternalCache</code>
+   * @param listener a listener which should receive notifications abouts queues being added or
+   *        removed.
+   */
+  private CacheClientNotifier(InternalCache cache, CacheServerStats acceptorStats,
+      int maximumMessageCount, int messageTimeToLive, ConnectionListener listener,
+      List overflowAttributesList, boolean isGatewayReceiver) {
+    // Set the Cache
+    setCache(cache);
+    this.acceptorStats = acceptorStats;
+    this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms
+                                                 // for close
+
+    // Set the LogWriter
+    this.logWriter = (InternalLogWriter) cache.getLogger();
+
+    this._connectionListener = listener;
+
+    // Set the security LogWriter
+    this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger();
+
+    this.maximumMessageCount = maximumMessageCount;
+    this.messageTimeToLive = messageTimeToLive;
+
+    // Initialize the statistics
+    StatisticsFactory factory;
+    if (isGatewayReceiver) {
+      factory = new DummyStatisticsFactory();
+    } else {
+      factory = this.getCache().getDistributedSystem();
+    }
+    this.statistics = new CacheClientNotifierStats(factory);
+
+    try {
+      this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
+      if (this.logFrequency <= 0) {
+        this.logFrequency = DEFAULT_LOG_FREQUENCY;
+      }
+    } catch (Exception e) {
+      this.logFrequency = DEFAULT_LOG_FREQUENCY;
+    }
+
+    eventEnqueueWaitTime =
+        Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
+    if (eventEnqueueWaitTime < 0) {
+      eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
+    }
+
+    // Schedule task to periodically ping clients.
+    scheduleClientPingTask();
+  }
+
+  /**
+   * this message is used to send interest registration to another server. Since interest
+   * registration performs a state-flush operation this message must not transmitted on an ordered
+   * socket
+   */
+  public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage
+      implements MessageWithReply {
+    ClientProxyMembershipID clientId;
+    ClientInterestMessageImpl clientMessage;
+    int processorId;
+
+    ServerInterestRegistrationMessage(ClientProxyMembershipID clientID,
+        ClientInterestMessageImpl msg) {
+      this.clientId = clientID;
+      this.clientMessage = msg;
+    }
+
+    public ServerInterestRegistrationMessage() {}
+
+    static void sendInterestChange(DM dm, ClientProxyMembershipID clientID,
+        ClientInterestMessageImpl msg) {
+      ServerInterestRegistrationMessage smsg = new ServerInterestRegistrationMessage(clientID, msg);
+      Set recipients = dm.getOtherDistributionManagerIds();
+      smsg.setRecipients(recipients);
+      ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients);
+      smsg.processorId = rp.getProcessorId();
+      dm.putOutgoing(smsg);
+      try {
+        rp.waitForReplies();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override
+    protected void process(DistributionManager dm) {
+      // Get the proxy for the proxy id
+      try {
+        CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+        if (ccn != null) {
+          CacheClientProxy proxy = ccn.getClientProxy(clientId);
+          // If this VM contains a proxy for the requested proxy id, forward the
+          // message on to the proxy for processing
+          if (proxy != null) {
+            proxy.processInterestMessage(this.clientMessage);
+          }
+        }
+      } finally {
+        ReplyMessage reply = new ReplyMessage();
+        reply.setProcessorId(this.processorId);
+        reply.setRecipient(getSender());
+        try {
+          dm.putOutgoing(reply);
+        } catch (CancelException e) {
+          // can't send a reply, so ignore the exception
+        }
+      }
+    }
+
+    public int getDSFID() {
+      return SERVER_INTEREST_REGISTRATION_MESSAGE;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      out.writeInt(this.processorId);
+      InternalDataSerializer.invokeToData(this.clientId, out);
+      InternalDataSerializer.invokeToData(this.clientMessage, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      super.fromData(in);
+      this.processorId = in.readInt();
+      this.clientId = new ClientProxyMembershipID();
+      InternalDataSerializer.invokeFromData(this.clientId, in);
+      this.clientMessage = new ClientInterestMessageImpl();
+      InternalDataSerializer.invokeFromData(this.clientMessage, in);
+    }
+  }
+
+  protected void deliverInterestChange(ClientProxyMembershipID proxyID,
+      ClientInterestMessageImpl message) {
+    DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem())
+        .getDistributionManager();
     ServerInterestRegistrationMessage.sendInterestChange(dm, proxyID, message);
   }
 
-  CacheServerStats getAcceptorStats() {
+  public CacheServerStats getAcceptorStats() {
     return this.acceptorStats;
   }
 
-  SocketCloser getSocketCloser() {
+  public SocketCloser getSocketCloser() {
     return this.socketCloser;
   }
 
@@ -1963,8 +2048,8 @@ public class CacheClientNotifier {
   }
 
   private void clearCompiledQueries() {
-    if (!this.compiledQueries.isEmpty()) {
-      this.statistics.incCompiledQueryCount(-this.compiledQueries.size());
+    if (this.compiledQueries.size() > 0) {
+      this.statistics.incCompiledQueryCount(-(this.compiledQueries.size()));
       this.compiledQueries.clear();
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -1979,7 +2064,7 @@ public class CacheClientNotifier {
    * checks for the compiled queries that are not used and removes them.
    */
   private void startCompiledQueryCleanupThread() {
-    if (this.isCompiledQueryCleanupThreadStarted) {
+    if (isCompiledQueryCleanupThreadStarted) {
       return;
     }
 
@@ -2000,8 +2085,8 @@ public class CacheClientNotifier {
               statistics.incCompiledQueryCount(-1);
               if (isDebugEnabled) {
                 logger.debug("Removed compiled query from ccn.compliedQueries list. Query: "
-                    + q.getQueryString() + ". Total compiled queries are : "
-                    + statistics.getCompiledQueryCount());
+                             + q.getQueryString() + ". Total compiled queries are : "
+                             + statistics.getCompiledQueryCount());
               }
             }
           }
@@ -2009,23 +2094,23 @@ public class CacheClientNotifier {
       }
     };
 
-    synchronized (this.lockIsCompiledQueryCleanupThreadStarted) {
-      if (!this.isCompiledQueryCleanupThreadStarted) {
+    synchronized (lockIsCompiledQueryCleanupThreadStarted) {
+      if (!isCompiledQueryCleanupThreadStarted) {
         long period = DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME > 0
             ? DefaultQuery.TEST_COMPILED_QUERY_CLEAR_TIME : DefaultQuery.COMPILED_QUERY_CLEAR_TIME;
-        this.cache.getCCPTimer().scheduleAtFixedRate(task, period, period);
+        _cache.getCCPTimer().scheduleAtFixedRate(task, period, period);
       }
-      this.isCompiledQueryCleanupThreadStarted = true;
+      isCompiledQueryCleanupThreadStarted = true;
     }
   }
 
-  void scheduleClientPingTask() {
+  protected void scheduleClientPingTask() {
     this.clientPingTask = new SystemTimer.SystemTimerTask() {
 
       @Override
       public void run2() {
         // If there are no proxies, return
-        if (clientProxies.isEmpty()) {
+        if (CacheClientNotifier.this._clientProxies.isEmpty()) {
           return;
         }
 
@@ -2060,72 +2145,210 @@ public class CacheClientNotifier {
     if (logger.isDebugEnabled()) {
       logger.debug("Scheduling client ping task with period={} ms", CLIENT_PING_TASK_PERIOD);
     }
-    CacheClientNotifier.this.cache.getCCPTimer().scheduleAtFixedRate(this.clientPingTask,
+    CacheClientNotifier.this._cache.getCCPTimer().scheduleAtFixedRate(this.clientPingTask,
         CLIENT_PING_TASK_PERIOD, CLIENT_PING_TASK_PERIOD);
   }
 
   /**
+   * A string representing all hosts used for delivery purposes.
+   */
+  protected static final String ALL_HOSTS = "ALL_HOSTS";
+
+  /**
+   * An int representing all ports used for delivery purposes.
+   */
+  protected static final int ALL_PORTS = -1;
+
+  /**
+   * The map of known <code>CacheClientProxy</code> instances. Maps ClientProxyMembershipID to
+   * CacheClientProxy. Note that the keys in this map are not updated when a durable client
+   * reconnects. To make sure you get the updated ClientProxyMembershipID use this map to lookup the
+   * CacheClientProxy and then call getProxyID on it.
+   */
+  private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ _clientProxies =
+      new ConcurrentHashMap();
+
+  /**
+   * The map of <code>CacheClientProxy</code> instances which are getting initialized. Maps
+   * ClientProxyMembershipID to CacheClientProxy.
+   */
+  private final ConcurrentMap/* <ClientProxyMembershipID, CacheClientProxy> */ _initClientProxies =
+      new ConcurrentHashMap();
+
+  private final HashSet<ClientProxyMembershipID> timedOutDurableClientProxies =
+      new HashSet<ClientProxyMembershipID>();
+
+  /**
+   * The GemFire <code>InternalCache</code>. Note that since this is a singleton class you should
+   * not use a direct reference to _cache in CacheClientNotifier code. Instead, you should always
+   * use <code>getCache()</code>
+   */
+  private InternalCache _cache;
+
+  private InternalLogWriter logWriter;
+
+  /**
+   * The GemFire security <code>LogWriter</code>
+   */
+  private InternalLogWriter securityLogWriter;
+
+  /** the maximum number of messages that can be enqueued in a client-queue. */
+  private int maximumMessageCount;
+
+  /**
+   * the time (in seconds) after which a message in the client queue will expire.
+   */
+  private int messageTimeToLive;
+
+  /**
+   * A listener which receives notifications about queues that are added or removed
+   */
+  private ConnectionListener _connectionListener;
+
+  private CacheServerStats acceptorStats;
+
+  /**
+   * haContainer can hold either the name of the client-messages-region (in case of eviction
+   * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In
+   * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value.
+   */
+  private volatile HAContainerWrapper haContainer;
+
+  /**
+   * The size of the server-to-client communication socket buffers. This can be modified using the
+   * BridgeServer.SOCKET_BUFFER_SIZE system property.
+   */
+  static final private int socketBufferSize =
+      Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue();
+
+  /**
+   * The statistics for this notifier
+   */
+  protected final CacheClientNotifierStats statistics;
+
+  /**
+   * The <code>InterestRegistrationListener</code> instances registered in this VM. This is used
+   * when modifying the set of listeners.
+   */
+  private final Set writableInterestRegistrationListeners = new CopyOnWriteArraySet();
+
+  /**
+   * The <code>InterestRegistrationListener</code> instances registered in this VM. This is used to
+   * provide a read-only <code>Set</code> of listeners.
+   */
+  private final Set readableInterestRegistrationListeners =
+      Collections.unmodifiableSet(writableInterestRegistrationListeners);
+
+  /**
+   * System property name for indicating how much frequently the "Queue full" message should be
+   * logged.
+   */
+  public static final String MAX_QUEUE_LOG_FREQUENCY =
+      DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit";
+
+  public static final long DEFAULT_LOG_FREQUENCY = 1000;
+
+  public static final String EVENT_ENQUEUE_WAIT_TIME_NAME =
+      DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME";
+
+  public static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100;
+
+  /**
+   * System property value denoting the time in milliseconds. Any thread putting an event into a
+   * subscription queue, which is full, will wait this much time for the queue to make space. It'll
+   * then enque the event possibly causing the queue to grow beyond its capacity/max-size. See
+   * #51400.
+   */
+  public static int eventEnqueueWaitTime;
+
+  /**
+   * The frequency of logging the "Queue full" message.
+   */
+  private long logFrequency = DEFAULT_LOG_FREQUENCY;
+
+  private final ConcurrentHashMap<String, DefaultQuery> compiledQueries =
+      new ConcurrentHashMap<String, DefaultQuery>();
+
+  private volatile boolean isCompiledQueryCleanupThreadStarted = false;
+
+  private final Object lockIsCompiledQueryCleanupThreadStarted = new Object();
+
+  private SystemTimer.SystemTimerTask clientPingTask;
+
+  private final SocketCloser socketCloser;
+
+  private static final long CLIENT_PING_TASK_PERIOD =
+      Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000);
+
+  private static final long CLIENT_PING_TASK_COUNTER =
+      Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3);
+
+  public long getLogFrequency() {
+    return this.logFrequency;
+  }
+
+  /**
    * @return the haContainer
    */
   public Map getHaContainer() {
-    return this.haContainer;
+    return haContainer;
   }
 
-  private void initHaContainer(List overflowAttributesList) {
+  public void initHaContainer(List overflowAttributesList) {
     // lazily initialize haContainer in case this CCN instance was created by a gateway receiver
     if (overflowAttributesList != null
         && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList.get(0))) {
-      this.haContainer = new HAContainerRegion(this.cache.getRegion(Region.SEPARATOR
-          + CacheServerImpl.clientMessagesRegion(this.cache, (String) overflowAttributesList.get(0),
-              (Integer) overflowAttributesList.get(1), (Integer) overflowAttributesList.get(2),
+      haContainer = new HAContainerRegion(_cache.getRegion(Region.SEPARATOR
+          + CacheServerImpl.clientMessagesRegion(_cache, (String) overflowAttributesList.get(0),
+              ((Integer) overflowAttributesList.get(1)).intValue(),
+              ((Integer) overflowAttributesList.get(2)).intValue(),
               (String) overflowAttributesList.get(3), (Boolean) overflowAttributesList.get(4))));
     } else {
-      this.haContainer = new HAContainerMap(new ConcurrentHashMap());
+      haContainer = new HAContainerMap(new ConcurrentHashMap());
     }
-    assert this.haContainer != null;
+    assert haContainer != null;
 
     if (logger.isDebugEnabled()) {
-      logger.debug("ha container ({}) has been created.", this.haContainer.getName());
+      logger.debug("ha container ({}) has been created.", haContainer.getName());
     }
   }
 
-  void addToBlacklistedClient(ClientProxyMembershipID proxyID) {
-    this.blackListedClients.add(proxyID);
+  private final Set blackListedClients = new CopyOnWriteArraySet();
+
+  public void addToBlacklistedClient(ClientProxyMembershipID proxyID) {
+    blackListedClients.add(proxyID);
     // ensure that cache and distributed system state are current and open
-    getCache();
+    this.getCache();
     new ScheduledThreadPoolExecutor(1).schedule(new ExpireBlackListTask(proxyID), 120,
         TimeUnit.SECONDS);
   }
 
-  Set getBlacklistedClient() {
-    return this.blackListedClients;
+  public Set getBlacklistedClient() {
+    return blackListedClients;
   }
 
   /**
-   * @param cache the cache to set
+   * @param _cache the _cache to set
    */
-  private void setCache(InternalCache cache) {
-    this.cache = cache;
+  private void setCache(InternalCache _cache) {
+    this._cache = _cache;
   }
 
-  /**
-   * Non-static inner class ExpireBlackListTask
-   */
   private class ExpireBlackListTask extends PoolTask {
-    private final ClientProxyMembershipID proxyID;
+    private ClientProxyMembershipID proxyID;
 
-    ExpireBlackListTask(ClientProxyMembershipID proxyID) {
+    public ExpireBlackListTask(ClientProxyMembershipID proxyID) {
       this.proxyID = proxyID;
     }
 
     @Override
     public void run2() {
-      if (blackListedClients.remove(this.proxyID)) {
+      if (blackListedClients.remove(proxyID)) {
         if (logger.isDebugEnabled()) {
-          logger.debug("{} client is no longer blacklisted", this.proxyID);
+          logger.debug("{} client is no longer blacklisted", proxyID);
         }
       }
     }
   }
-
 }
+


Mime
View raw message