geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [21/22] geode git commit: Create ClientCachePutBench
Date Thu, 13 Apr 2017 17:18:42 GMT
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 3b0c2ff..379b65b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -43,7 +43,6 @@ import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.StatisticsFactory;
 import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.ClientSession;
 import org.apache.geode.cache.DynamicRegionFactory;
@@ -111,7 +110,6 @@ import org.apache.shiro.util.ThreadState;
  * It queues messages to be sent from the server to the client. It then reads those messages from
  * the queue and sends them to the client.
  *
- *
  * @since GemFire 4.2
  */
 @SuppressWarnings("synthetic-access")
@@ -119,155 +117,127 @@ public class CacheClientProxy implements ClientSession {
   private static final Logger logger = LogService.getLogger();
 
   /**
-   * The socket between the server and the client
-   */
-  protected Socket _socket;
-
-  private final AtomicBoolean _socketClosed = new AtomicBoolean();
-
-  /**
-   * A communication buffer used by each message we send to the client
+   * Notify the region when a client interest registration occurs. This tells the region to update
+   * access time when an update is to be pushed to a client. It is enabled only for
+   * <code>PartitionedRegion</code>s currently.
    */
-  protected ByteBuffer _commBuffer;
+  private static final boolean NOTIFY_REGION_ON_INTEREST =
+      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest");
 
   /**
-   * The remote host's IP address string (cached for convenience)
+   * The number of times to peek on shutdown before giving up and shutting down
    */
-  protected String _remoteHostAddress;
+  private static final int MAXIMUM_SHUTDOWN_PEEKS =
+      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50);
 
   /**
-   * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
+   * Default value for slow starting time of dispatcher
    */
-  protected volatile boolean isMarkedForRemoval = false;
+  private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
 
   /**
-   * @see #isMarkedForRemoval
+   * Key in the system property from which the slow starting time value will be retrieved
    */
-  protected final Object isMarkedForRemovalLock = new Object();
+  private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
 
   /**
-   * The proxy id of the client represented by this proxy
+   * TODO: delete this and rewrite the tests that use this NOTE: this is NOT thread safe
    */
-  protected ClientProxyMembershipID proxyID;
+  private static TestHook testHook;
 
   /**
-   * The GemFire cache
+   * TODO: delete this and rewrite the test that uses this A debug flag used for testing Backward
+   * compatibility
    */
-  protected final GemFireCacheImpl _cache;
+  private static boolean afterMessageCreationForTesting = false;
 
   /**
-   * The list of keys that the client represented by this proxy is interested in (stored by region)
+   * TODO: delete this and rewrite the test that uses this for testing purposes, delays the start of
+   * the dispatcher thread
    */
-  protected final ClientInterestList[] cils = new ClientInterestList[2];
+  private static boolean isSlowStartForTesting = false;
 
-  /**
-   * A thread that dispatches messages to the client
-   */
-  protected volatile MessageDispatcher _messageDispatcher;
+  private final AtomicBoolean socketClosed = new AtomicBoolean();
 
   /**
-   * The statistics for this proxy
+   * @see #isMarkedForRemoval
    */
-  protected final CacheClientProxyStats _statistics;
-
-  protected final AtomicReference _durableExpirationTask = new AtomicReference();
-
-  protected SystemTimer durableTimer;
+  private final Object isMarkedForRemovalLock = new Object();
 
   /**
-   * Whether this dispatcher is paused
+   * The GemFire cache
    */
-  protected volatile boolean _isPaused = true;
+  private final GemFireCacheImpl cache;
 
   /**
-   * True if we are connected to a client.
-   */
-  private volatile boolean connected = false;
-  // /**
-  // * A string representing interest in all keys
-  // */
-  // protected static final String ALL_KEYS = "ALL_KEYS";
-  //
-  /**
-   * True if a marker message is still in the ha queue.
+   * The list of keys that the client represented by this proxy is interested in (stored by region)
    */
-  private boolean markerEnqueued = false;
+  private final ClientInterestList[] cils = new ClientInterestList[2];
 
   /**
-   * The number of times to peek on shutdown before giving up and shutting down
+   * The statistics for this proxy
    */
-  protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer
-      .getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50).intValue();
+  private final CacheClientProxyStats _statistics;
 
-  /**
-   * The number of milliseconds to wait for an offering to the message queue
-   */
-  protected static final int MESSAGE_OFFER_TIME = 0;
-
-  /**
-   * The default maximum message queue size
-   */
-  // protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000;
+  private final AtomicReference _durableExpirationTask = new AtomicReference();
 
   /** The message queue size */
-  protected final int _maximumMessageCount;
+  private final int _maximumMessageCount;
 
   /**
    * The time (in seconds ) after which a message in the client queue will expire.
    */
-  protected final int _messageTimeToLive;
+  private final int _messageTimeToLive;
 
   /**
    * The <code>CacheClientNotifier</code> registering this proxy.
    */
-  protected final CacheClientNotifier _cacheClientNotifier;
+  private final CacheClientNotifier cacheClientNotifier;
 
-  /**
-   * Defaults to true; meaning do some logging of dropped client notification messages. Set the
-   * system property to true to cause dropped messages to NOT be logged.
-   */
-  protected static final boolean LOG_DROPPED_MSGS =
-      !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disableNotificationWarnings");
+  private final Object clientUserAuthsLock = new Object();
 
   /**
-   * for testing purposes, delays the start of the dispatcher thread
+   * The AcceptorImpl identifier to which the proxy is connected.
    */
-  public static boolean isSlowStartForTesting = false;
+  private final long _acceptorId;
 
-  /**
-   * Default value for slow starting time of dispatcher
-   */
-  private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
+  /** acceptor's setting for notifyBySubscription */
+  private final boolean notifyBySubscription;
+
+  private final Object queuedEventsSync = new Object();
 
   /**
-   * Key in the system property from which the slow starting time value will be retrieved
+   * A counter that keeps track of how many task iterations that have occurred since the last ping
+   * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments
+   * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent.
    */
-  private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
+  private final AtomicInteger pingCounter = new AtomicInteger();
 
-  private boolean isPrimary;
+  private final Object drainLock = new Object();
 
-  /** @since GemFire 5.7 */
-  protected byte clientConflation = HandShake.CONFLATION_DEFAULT;
+  private final Object drainsInProgressLock = new Object();
+
+  private final SecurityService securityService;
 
   /**
-   * Flag to indicate whether to keep a durable client's queue alive
+   * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
    */
-  boolean keepalive = false;
-
-  private AccessControl postAuthzCallback;
-  private Subject subject;
+  private volatile boolean isMarkedForRemoval = false;
 
   /**
-   * For multiuser environment..
+   * A thread that dispatches messages to the client
    */
-  private ClientUserAuths clientUserAuths;
+  private volatile MessageDispatcher _messageDispatcher;
 
-  private final Object clientUserAuthsLock = new Object();
+  /**
+   * Whether this dispatcher is paused
+   */
+  private volatile boolean _isPaused = true;
 
   /**
-   * The version of the client
+   * True if we are connected to a client.
    */
-  private Version clientVersion;
+  private volatile boolean connected = false;
 
   /**
    * A map of region name as key and integer as its value. Basically, it stores the names of the
@@ -278,42 +248,60 @@ public class CacheClientProxy implements ClientSession {
    */
   private volatile Map regionsWithEmptyDataPolicy = new HashMap();
 
+  /** To queue the events arriving during message dispatcher initialization */
+  private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents =
+      new ConcurrentLinkedQueue<Conflatable>();
+
+  private volatile boolean messageDispatcherInit = false;
+
   /**
-   * A debug flag used for testing Backward compatibility
+   * The socket between the server and the client
    */
-  public static boolean AFTER_MESSAGE_CREATION_FLAG = false;
+  private Socket socket;
 
   /**
-   * Notify the region when a client interest registration occurs. This tells the region to update
-   * access time when an update is to be pushed to a client. It is enabled only for
-   * <code>PartitionedRegion</code>s currently.
+   * A communication buffer used by each message we send to the client
    */
-  protected static final boolean NOTIFY_REGION_ON_INTEREST =
-      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest");
+  private ByteBuffer _commBuffer;
 
   /**
-   * The AcceptorImpl identifier to which the proxy is connected.
+   * The remote host's IP address string (cached for convenience)
    */
-  private final long _acceptorId;
+  private String _remoteHostAddress;
 
-  /** acceptor's setting for notifyBySubscription */
-  private final boolean notifyBySubscription;
+  /**
+   * The proxy id of the client represented by this proxy
+   */
+  private ClientProxyMembershipID proxyID;
 
-  /** To queue the events arriving during message dispatcher initialization */
-  private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents =
-      new ConcurrentLinkedQueue<Conflatable>();
+  /**
+   * True if a marker message is still in the ha queue.
+   */
+  private boolean markerEnqueued = false;
 
-  private final Object queuedEventsSync = new Object();
+  private boolean isPrimary;
 
-  private volatile boolean messageDispatcherInit = false;
+  /** @since GemFire 5.7 */
+  private byte clientConflation = HandShake.CONFLATION_DEFAULT;
 
   /**
-   * A counter that keeps track of how many task iterations that have occurred since the last ping
-   * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments
-   * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent.
+   * Flag to indicate whether to keep a durable client's queue alive
    */
-  private final AtomicInteger pingCounter = new AtomicInteger();
+  private boolean keepalive = false;
+
+  private AccessControl postAuthzCallback;
 
+  private Subject subject;
+
+  /**
+   * For multiuser environment..
+   */
+  private ClientUserAuths clientUserAuths;
+
+  /**
+   * The version of the client
+   */
+  private Version clientVersion;
 
   /** Date on which this instances was created */
   private Date creationDate;
@@ -321,52 +309,75 @@ public class CacheClientProxy implements ClientSession {
   /**
    * true when the durable client associated with this proxy is being restarted and prevents cqs
    * from being closed and drained
-   **/
+   */
   private boolean drainLocked = false;
-  private final Object drainLock = new Object();
 
   /** number of cq drains that are currently in progress **/
   private int numDrainsInProgress = 0;
-  private final Object drainsInProgressLock = new Object();
 
-  private SecurityService securityService = SecurityService.getSecurityService();
+  static CacheClientProxy createCacheClientProxy(final CacheClientNotifier ccn,
+      final GemFireCacheImpl cache, final StatisticsFactory statsFactory,
+      final SecurityService securityService, final Socket socket,
+      final ClientProxyMembershipID proxyID, final boolean isPrimary, final byte clientConflation,
+      final Version clientVersion, final long acceptorId, final boolean notifyBySubscription) {
+
+    CacheClientProxy cacheClientProxy =
+        new CacheClientProxy(ccn, cache, statsFactory, securityService, socket, proxyID, isPrimary,
+            clientConflation, clientVersion, acceptorId, notifyBySubscription);
+
+    // Create the interest list
+    cacheClientProxy.cils[RegisterInterestTracker.interestListIndex] =
+        new ClientInterestList(cacheClientProxy, cacheClientProxy.proxyID);
+    // Create the durable interest list
+    cacheClientProxy.cils[RegisterInterestTracker.durableInterestListIndex] =
+        new ClientInterestList(cacheClientProxy, cacheClientProxy.getDurableId());
+
+    return cacheClientProxy;
+  }
 
   /**
    * Constructor.
    *
    * @param ccn The <code>CacheClientNotifier</code> registering this proxy
+   * @param cache
    * @param socket The socket between the server and the client
    * @param proxyID representing the Connection Proxy of the clien
    * @param isPrimary The boolean stating whether this prozxy is primary
-   * @throws CacheException {
-   */
-  protected CacheClientProxy(CacheClientNotifier ccn, Socket socket,
-      ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation,
-      Version clientVersion, long acceptorId, boolean notifyBySubscription) throws CacheException {
+   * @param clientConflation
+   * @param clientVersion
+   */
+  private CacheClientProxy(final CacheClientNotifier ccn, final GemFireCacheImpl cache,
+      final StatisticsFactory statsFactory, final SecurityService securityService,
+      final Socket socket, final ClientProxyMembershipID proxyID, final boolean isPrimary,
+      final byte clientConflation, final Version clientVersion, final long acceptorId,
+      final boolean notifyBySubscription) {
     initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion);
-    this._cacheClientNotifier = ccn;
-    this._cache = (GemFireCacheImpl) ccn.getCache();
+    this.cacheClientNotifier = ccn;
+    this.cache = cache;
+    this.securityService = securityService;
     this._maximumMessageCount = ccn.getMaximumMessageCount();
     this._messageTimeToLive = ccn.getMessageTimeToLive();
     this._acceptorId = acceptorId;
     this.notifyBySubscription = notifyBySubscription;
-    StatisticsFactory factory = this._cache.getDistributedSystem();
+
     this._statistics =
-        new CacheClientProxyStats(factory, "id_" + this.proxyID.getDistributedMember().getId()
-            + "_at_" + this._remoteHostAddress + ":" + this._socket.getPort());
+        new CacheClientProxyStats(statsFactory, "id_" + this.proxyID.getDistributedMember().getId()
+            + "_at_" + this._remoteHostAddress + ":" + this.socket.getPort());
 
-    // Create the interest list
-    this.cils[RegisterInterestTracker.interestListIndex] =
-        new ClientInterestList(this, this.proxyID);
-    // Create the durable interest list
-    this.cils[RegisterInterestTracker.durableInterestListIndex] =
-        new ClientInterestList(this, this.getDurableId());
     this.postAuthzCallback = null;
-    this._cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
+    this.cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
     this.creationDate = new Date();
     initializeClientAuths();
   }
 
+  boolean isClientConflationOn() {
+    return this.clientConflation == HandShake.CONFLATION_ON;
+  }
+
+  boolean isClientConflationDefault() {
+    return this.clientConflation == HandShake.CONFLATION_ON;
+  }
+
   private void initializeClientAuths() {
     if (AcceptorImpl.isPostAuthzCallbackPresent())
       this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID);
@@ -411,13 +422,13 @@ public class CacheClientProxy implements ClientSession {
 
   private void initializeTransientFields(Socket socket, ClientProxyMembershipID pid, boolean ip,
       byte cc, Version vers) {
-    this._socket = socket;
+    this.socket = socket;
     this.proxyID = pid;
     this.connected = true;
     {
       int bufSize = 1024;
       try {
-        bufSize = _socket.getSendBufferSize();
+        bufSize = this.socket.getSendBufferSize();
         if (bufSize < 1024) {
           bufSize = 1024;
         }
@@ -450,7 +461,6 @@ public class CacheClientProxy implements ClientSession {
     return this.notifyBySubscription;
   }
 
-
   /**
    * Returns the DistributedMember represented by this proxy
    */
@@ -458,47 +468,6 @@ public class CacheClientProxy implements ClientSession {
     return this.proxyID;
   }
 
-  // the following code was commented out simply because it was not used
-  // /**
-  // * Determines if the proxy represents the client host (and only the host, not
-  // * necessarily the exact VM running on the host)
-  // *
-  // * @return Whether the proxy represents the client host
-  // */
-  // protected boolean representsClientHost(String clientHost)
-  // {
-  // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
-  // return this._remoteHostAddress.equals(clientHost);
-  // }
-
-  // protected boolean representsClientVM(DistributedMember remoteMember)
-  // {
-  // // logger.warn("Is input port " + clientPort + " contained in " +
-  // // logger.warn("Does input host " + clientHost + " equal " +
-  // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
-  // // logger.warn("representsClientVM: " +
-  // // (representsClientHost(clientHost) && containsPort(clientPort)));
-  // return (proxyID.getDistributedMember().equals(remoteMember));
-  // }
-
-  // /**
-  // * Determines if the CacheClientUpdater proxied by this instance is listening
-  // * on the input clientHost and clientPort
-  // *
-  // * @param clientHost
-  // * The host name of the client to compare
-  // * @param clientPort
-  // * The port number of the client to compare
-  // *
-  // * @return Whether the CacheClientUpdater proxied by this instance is
-  // * listening on the input clientHost and clientPort
-  // */
-  // protected boolean representsCacheClientUpdater(String clientHost,
-  // int clientPort)
-  // {
-  // return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
-  // }
-
   protected boolean isMember(ClientProxyMembershipID memberId) {
     return this.proxyID.equals(memberId);
   }
@@ -522,11 +491,11 @@ public class CacheClientProxy implements ClientSession {
    * @return the socket between the server and the client
    */
   protected Socket getSocket() {
-    return this._socket;
+    return this.socket;
   }
 
   public String getSocketHost() {
-    return this._socket.getInetAddress().getHostAddress();
+    return this.socket.getInetAddress().getHostAddress();
   }
 
   protected ByteBuffer getCommBuffer() {
@@ -548,7 +517,7 @@ public class CacheClientProxy implements ClientSession {
    * @return the remote host's port
    */
   public int getRemotePort() {
-    return this._socket.getPort();
+    return this.socket.getPort();
   }
 
   /**
@@ -593,7 +562,7 @@ public class CacheClientProxy implements ClientSession {
             this.isMarkedForRemovalLock.wait();
           } catch (InterruptedException e) {
             interrupted = true;
-            this._cache.getCancelCriterion().checkCancelInProgress(e);
+            this.cache.getCancelCriterion().checkCancelInProgress(e);
           }
         } // while
       } finally {
@@ -621,7 +590,7 @@ public class CacheClientProxy implements ClientSession {
    * @return the GemFire cache
    */
   public GemFireCacheImpl getCache() {
-    return this._cache;
+    return this.cache;
   }
 
   public Set<String> getInterestRegisteredRegions() {
@@ -649,7 +618,7 @@ public class CacheClientProxy implements ClientSession {
    * @return this proxy's <code>CacheClientNotifier</code>
    */
   protected CacheClientNotifier getCacheClientNotifier() {
-    return this._cacheClientNotifier;
+    return this.cacheClientNotifier;
   }
 
   /**
@@ -852,8 +821,8 @@ public class CacheClientProxy implements ClientSession {
         }
       }
     } catch (Exception ex) {
-      if (this._cache.getSecurityLoggerI18n().warningEnabled()) {
-        this._cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON,
+      if (this.cache.getSecurityLoggerI18n().warningEnabled()) {
+        this.cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON,
             new Object[] {this, ex});
       }
     }
@@ -991,9 +960,9 @@ public class CacheClientProxy implements ClientSession {
   }
 
   private void closeSocket() {
-    if (this._socketClosed.compareAndSet(false, true)) {
+    if (this.socketClosed.compareAndSet(false, true)) {
       // Close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
+      this.cacheClientNotifier.getSocketCloser().asyncClose(this.socket, this._remoteHostAddress,
           null);
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
     }
@@ -1008,7 +977,7 @@ public class CacheClientProxy implements ClientSession {
     {
       String remoteHostAddress = this._remoteHostAddress;
       if (remoteHostAddress != null) {
-        this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
+        this.cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
         this._remoteHostAddress = null;
       }
     }
@@ -1124,7 +1093,7 @@ public class CacheClientProxy implements ClientSession {
       InterestResultPolicy policy, boolean isDurable, boolean receiveValues, int interestType) {
     // Create a client interest message for the keyOfInterest
     ClientInterestMessageImpl message = new ClientInterestMessageImpl(
-        new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
+        new EventID(this.cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
         policy.getOrdinal(), isDurable, !receiveValues, ClientInterestMessageImpl.REGISTER);
 
     // Notify all secondary proxies of a change in interest
@@ -1146,7 +1115,7 @@ public class CacheClientProxy implements ClientSession {
       String regionName, Object keyOfInterest) {
     // Get the initial value
     Get70 request = (Get70) Get70.getCommand();
-    LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName);
+    LocalRegion lr = (LocalRegion) this.cache.getRegion(regionName);
     Get70.Entry entry = request.getValueAndIsObject(lr, keyOfInterest, null, null);
     boolean isObject = entry.isObject;
     byte[] value = null;
@@ -1170,7 +1139,7 @@ public class CacheClientProxy implements ClientSession {
       EventID eventId = null;
       if (clientInterestMessage == null) {
         // If the clientInterestMessage is null, create a new event id
-        eventId = new EventID(this._cache.getDistributedSystem());
+        eventId = new EventID(this.cache.getDistributedSystem());
       } else {
         // If the clientInterestMessage is not null, base the event id off its event id to fix
         // GEM-794.
@@ -1239,7 +1208,7 @@ public class CacheClientProxy implements ClientSession {
       boolean isDurable, boolean receiveValues, int interestType) {
     // Notify all secondary proxies of a change in interest
     ClientInterestMessageImpl message = new ClientInterestMessageImpl(
-        new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
+        new EventID(this.cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
         (byte) 0, isDurable, !receiveValues, ClientInterestMessageImpl.UNREGISTER);
     notifySecondariesOfInterestChange(message);
 
@@ -1269,17 +1238,9 @@ public class CacheClientProxy implements ClientSession {
           .append("->").append(InterestType.getString(message.getInterestType()));
       logger.debug(buffer.toString());
     }
-    this._cacheClientNotifier.deliverInterestChange(this.proxyID, message);
+    this.cacheClientNotifier.deliverInterestChange(this.proxyID, message);
   }
 
-  /*
-   * protected void addFilterRegisteredClients(String regionName, Object keyOfInterest) { try {
-   * this._cacheClientNotifier.addFilterRegisteredClients(regionName, this.proxyID); } catch
-   * (RegionDestroyedException e) {
-   * logger.warn(LocalizedStrings.CacheClientProxy_0_INTEREST_REG_FOR_0_FAILED, regionName + "->" +
-   * keyOfInterest, e); } }
-   */
-
   /**
    * Registers interest in the input region name and key
    *
@@ -1293,7 +1254,7 @@ public class CacheClientProxy implements ClientSession {
     cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates);
     if (flushState) {
       flushForInterestRegistration(regionName,
-          this._cache.getDistributedSystem().getDistributedMember());
+          this.cache.getDistributedSystem().getDistributedMember());
     }
     HARegionQueue queue = getHARegionQueue();
     if (queue != null) { // queue is null during initialization
@@ -1306,7 +1267,7 @@ public class CacheClientProxy implements ClientSession {
    * interest. During queue creation it is the queue's image provider.
    */
   public void flushForInterestRegistration(String regionName, DistributedMember target) {
-    Region r = this._cache.getRegion(regionName);
+    Region r = this.cache.getRegion(regionName);
     if (r == null) {
       if (logger.isDebugEnabled()) {
         logger.debug("Unable to find region '{}' to flush for interest registration", regionName);
@@ -1320,7 +1281,7 @@ public class CacheClientProxy implements ClientSession {
       if (r instanceof PartitionedRegion) {
         // need to flush all buckets. SFO should be changed to target buckets
         // belonging to a particular PR, but it doesn't have that option right now
-        sfo = new StateFlushOperation(this._cache.getDistributedSystem().getDistributionManager());
+        sfo = new StateFlushOperation(this.cache.getDistributedSystem().getDistributionManager());
       } else {
         sfo = new StateFlushOperation((DistributedRegion) r);
       }
@@ -1378,7 +1339,7 @@ public class CacheClientProxy implements ClientSession {
     if (getHARegionQueue() != null) {
       if (flushState) {
         flushForInterestRegistration(regionName,
-            this._cache.getDistributedSystem().getDistributedMember());
+            this.cache.getDistributedSystem().getDistributedMember());
       }
       getHARegionQueue().setHasRegisteredInterest(true);
     }
@@ -1643,7 +1604,7 @@ public class CacheClientProxy implements ClientSession {
     if (logger.isDebugEnabled()) {
       logger.debug("About to send message directly to {}", this);
     }
-    if (this._messageDispatcher != null && this._socket != null && !this._socket.isClosed()) {
+    if (this._messageDispatcher != null && this.socket != null && !this.socket.isClosed()) {
       // If the socket is open, send the message to it
       this._messageDispatcher.sendMessageDirectly(message);
       if (logger.isDebugEnabled()) {
@@ -1759,7 +1720,7 @@ public class CacheClientProxy implements ClientSession {
     if (this.isPrimary) {
       // Add the marker to the queue
       if (!processedMarker) {
-        EventID eventId = new EventID(this._cache.getDistributedSystem());
+        EventID eventId = new EventID(this.cache.getDistributedSystem());
         this._messageDispatcher.enqueueMarker(new ClientMarkerMessageImpl(eventId));
       }
 
@@ -1810,30 +1771,18 @@ public class CacheClientProxy implements ClientSession {
   @Override
   public String toString() {
     StringBuffer buffer = new StringBuffer();
-    buffer.append("CacheClientProxy[")
-        // .append("client proxy id=")
-        .append(this.proxyID)
-        // .append("; client host name=")
-        // .append(this._socket.getInetAddress().getCanonicalHostName())
-        // .append("; client host address=")
-        // .append(this._remoteHostAddress)
-        .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary)
-        .append("; version=").append(clientVersion).append("]");
+    buffer.append("CacheClientProxy[").append(this.proxyID).append("; port=")
+        .append(this.socket.getPort()).append("; primary=").append(isPrimary).append("; version=")
+        .append(clientVersion).append("]");
     return buffer.toString();
   }
 
   public String getState() {
     StringBuffer buffer = new StringBuffer();
-    buffer.append("CacheClientProxy[")
-        // .append("client proxy id=")
-        .append(this.proxyID)
-        // .append("; client host name=")
-        // .append(this._socket.getInetAddress().getCanonicalHostName())
-        // .append("; client host address=")
-        // .append(this._remoteHostAddress)
-        .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary)
-        .append("; version=").append(clientVersion).append("; paused=").append(isPaused())
-        .append("; alive=").append(isAlive()).append("; connected=").append(isConnected())
+    buffer.append("CacheClientProxy[").append(this.proxyID).append("; port=")
+        .append(this.socket.getPort()).append("; primary=").append(isPrimary).append("; version=")
+        .append(clientVersion).append("; paused=").append(isPaused()).append("; alive=")
+        .append(isAlive()).append("; connected=").append(isConnected())
         .append("; isMarkedForRemoval=").append(isMarkedForRemoval).append("]");
 
     if (_messageDispatcher != null && isAlive()) {
@@ -1844,15 +1793,7 @@ public class CacheClientProxy implements ClientSession {
   }
 
   public boolean isPrimary() {
-    // boolean primary = this._messageDispatcher.isAlive()
-    // || this._messageDispatcher._messageQueue.isPrimary();
-    boolean primary = this.isPrimary;
-    // System.out.println(this + ": DISPATCHER IS ALIVE: " + this._messageDispatcher.isAlive());
-    // System.out.println(this + ": DISPATCHER QUEUE IS PRIMARY: " +
-    // this._messageDispatcher._messageQueue.isPrimary());
-    // System.out.println(this + ": IS PRIMARY: " + primary);
-    return primary;
-    // return this.isPrimary ;
+    return this.isPrimary;
   }
 
   protected boolean basicIsPrimary() {
@@ -1863,16 +1804,10 @@ public class CacheClientProxy implements ClientSession {
     this.isPrimary = isPrimary;
   }
 
-  // private static int nextId = 0;
-  // static protected int getNextId() {
-  // synchronized (CacheClientProxy.class) {
-  // return ++nextId;
-  // }
-  // }
-  /*
+  /**
    * Return this client's HA region queue
    * 
-   * @returns - HARegionQueue of the client
+   * @return HARegionQueue of the client
    */
   public HARegionQueue getHARegionQueue() {
     if (this._messageDispatcher != null) {
@@ -1881,7 +1816,6 @@ public class CacheClientProxy implements ClientSession {
     return null;
   }
 
-
   /**
    * Reinitialize a durable <code>CacheClientProxy</code> with a new client.
    * 
@@ -1952,7 +1886,7 @@ public class CacheClientProxy implements ClientSession {
 
         // Close the proxy
         terminateDispatching(false);
-        _cacheClientNotifier._statistics.incQueueDroppedCount();
+        cacheClientNotifier._statistics.incQueueDroppedCount();
 
         /**
          * Setting the expiration task to null again and cancelling existing one, if any. See
@@ -1978,7 +1912,7 @@ public class CacheClientProxy implements ClientSession {
 
     };
     if (this._durableExpirationTask.compareAndSet(null, task)) {
-      _cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L);
+      cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L);
     }
   }
 
@@ -1996,11 +1930,131 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
+  public static void setTestHook(TestHook value) {
+    testHook = value;
+  }
+
+  public static void unsetTestHook() {
+    testHook = null;
+  }
+
+  public static TestHook getTestHook() {
+    return testHook;
+  }
+
+  public static void setSlowStartForTesting() {
+    isSlowStartForTesting = true;
+  }
+
+  static void unsetSlowStartForTesting() {
+    isSlowStartForTesting = false;
+  }
+
+  static void setAfterMessageCreationForTesting() {
+    afterMessageCreationForTesting = true;
+  }
+
+  static void unsetAfterMessageCreationForTesting() {
+    afterMessageCreationForTesting = false;
+  }
+
+  Socket getSocketForTesting() {
+    return this.socket;
+  }
+
+  ClientInterestList[] getClientInterestListForTesting() {
+    return this.cils;
+  }
+
+  MessageDispatcher getMessageDispatcherForTesting() {
+    return this._messageDispatcher;
+  }
+
+  /**
+   * Returns the current number of CQS the client installed.
+   *
+   * @return int the current count of CQs for this client
+   */
+  public int getCqCount() {
+    synchronized (this) {
+      return this._statistics.getCqCount();
+    }
+  }
+
+  /**
+   * Increment the number of CQs the client installed
+   *
+   */
+  public void incCqCount() {
+    synchronized (this) {
+      this._statistics.incCqCount();
+    }
+  }
+
+  /**
+   * Decrement the number of CQs the client installed
+   *
+   */
+  public synchronized void decCqCount() {
+    synchronized (this) {
+      this._statistics.decCqCount();
+    }
+  }
+
+  /**
+   * Returns true if the client has one CQ
+   *
+   * @return true if the client has one CQ
+   */
+  public boolean hasOneCq() {
+    synchronized (this) {
+      return this._statistics.getCqCount() == 1;
+    }
+  }
+
+  /**
+   * Returns true if the client has no CQs
+   *
+   * @return true if the client has no CQs
+   */
+  public boolean hasNoCq() {
+    synchronized (this) {
+      return this._statistics.getCqCount() == 0;
+    }
+  }
+
+  /**
+   * Get map of regions with empty data policy
+   *
+   * @since GemFire 6.1
+   */
+  public Map getRegionsWithEmptyDataPolicy() {
+    return regionsWithEmptyDataPolicy;
+  }
+
+  public int incrementAndGetPingCounter() {
+    int pingCount = this.pingCounter.incrementAndGet();
+    return pingCount;
+  }
+
+  public void resetPingCounter() {
+    this.pingCounter.set(0);
+  }
+
+  /**
+   * Returns the number of seconds that have elapsed since the Client proxy created.
+   *
+   * @since GemFire 7.0
+   */
+  public long getUpTime() {
+    return (System.currentTimeMillis() - this.creationDate.getTime()) / 1000;
+  }
+
   /**
    * Class <code>ClientInterestList</code> provides a convenient interface for manipulating client
    * interest information.
    */
-  static protected class ClientInterestList {
+  static class ClientInterestList {
 
     final CacheClientProxy ccp;
 
@@ -2035,7 +2089,7 @@ public class CacheClientProxy implements ClientSession {
       }
       Set keysRegistered = null;
       synchronized (this.interestListLock) {
-        LocalRegion r = (LocalRegion) this.ccp._cache.getRegion(regionName, true);
+        LocalRegion r = (LocalRegion) this.ccp.cache.getRegion(regionName, true);
         if (r == null) {
           throw new RegionDestroyedException("Region could not be found for interest registration",
               regionName);
@@ -2059,7 +2113,7 @@ public class CacheClientProxy implements ClientSession {
 
     protected FilterProfile getProfile(String regionName) {
       try {
-        return this.ccp._cache.getFilterProfile(regionName);
+        return this.ccp.cache.getFilterProfile(regionName);
       } catch (CancelException e) {
         return null;
       }
@@ -2225,7 +2279,6 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
-
   /**
    * Class <code>MessageDispatcher</code> is a <code>Thread</code> that processes messages bound for
    * the client by taking messsages from the message queue and sending them to the client over the
@@ -2238,34 +2291,17 @@ public class CacheClientProxy implements ClientSession {
      */
     protected final HARegionQueue _messageQueue;
 
-    // /**
-    // * An int used to keep track of the number of messages dropped for logging
-    // * purposes. If greater than zero then a warning has been logged about
-    // * messages being dropped.
-    // */
-    // private int _numberOfMessagesDropped = 0;
-
     /**
      * The proxy for which this dispatcher is processing messages
      */
     private final CacheClientProxy _proxy;
 
-    // /**
-    // * The conflator faciliates message conflation
-    // */
-    // protected BridgeEventConflator _eventConflator;
-
     /**
      * Whether the dispatcher is stopped
      */
     private volatile boolean _isStopped = true;
 
     /**
-     * guarded.By _pausedLock
-     */
-    // boolean _isPausedDispatcher = false;
-
-    /**
      * A lock object used to control pausing this dispatcher
      */
     protected final Object _pausedLock = new Object();
@@ -2278,11 +2314,6 @@ public class CacheClientProxy implements ClientSession {
     private final ReadWriteLock socketLock = new ReentrantReadWriteLock();
 
     private final Lock socketWriteLock = socketLock.writeLock();
-    // /**
-    // * A boolean verifying whether a warning has already been issued if the
-    // * message queue has reached its capacity.
-    // */
-    // private boolean _messageQueueCapacityReachedWarning = false;
 
     /**
      * Constructor.
@@ -2307,7 +2338,7 @@ public class CacheClientProxy implements ClientSession {
         HARegionQueueAttributes harq = new HARegionQueueAttributes();
         harq.setBlockingQueueCapacity(proxy._maximumMessageCount);
         harq.setExpiryTime(proxy._messageTimeToLive);
-        ((HAContainerWrapper) proxy._cacheClientNotifier.getHaContainer())
+        ((HAContainerWrapper) proxy.cacheClientNotifier.getHaContainer())
             .putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), getProxy());
         boolean createDurableQueue = proxy.proxyID.isDurable();
         boolean canHandleDelta = (proxy.clientVersion.compareTo(Version.GFE_61) >= 0)
@@ -2318,7 +2349,7 @@ public class CacheClientProxy implements ClientSession {
         }
         this._messageQueue = HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(),
             getCache(), harq, HARegionQueue.BLOCKING_HA_QUEUE, createDurableQueue,
-            proxy._cacheClientNotifier.getHaContainer(), proxy.getProxyID(),
+            proxy.cacheClientNotifier.getHaContainer(), proxy.getProxyID(),
             this._proxy.clientConflation, this._proxy.isPrimary(), canHandleDelta);
         // Check if interests were registered during HARegion GII.
         if (this._proxy.hasRegisteredInterested()) {
@@ -2409,10 +2440,6 @@ public class CacheClientProxy implements ClientSession {
             Thread.sleep(500);
           } catch (InterruptedException e) {
             interrupted = true;
-            /*
-             * GemFireCache c = (GemFireCache)_cache;
-             * c.getDistributedSystem().getCancelCriterion().checkCancelInProgress(e);
-             */
           } catch (CancelException e) {
             break;
           } catch (CacheException e) {
@@ -2507,7 +2534,7 @@ public class CacheClientProxy implements ClientSession {
       ClientMessage clientMessage = null;
       while (!isStopped()) {
         // SystemFailure.checkFailure(); DM's stopper does this
-        if (this._proxy._cache.getCancelCriterion().isCancelInProgress()) {
+        if (this._proxy.cache.getCancelCriterion().isCancelInProgress()) {
           break;
         }
         try {
@@ -2756,9 +2783,6 @@ public class CacheClientProxy implements ClientSession {
       }
       Message message = null;
 
-      // byte[] latestValue =
-      // this._eventConflator.getLatestValue(clientMessage);
-
       if (clientMessage instanceof ClientUpdateMessage) {
         byte[] latestValue = (byte[]) ((ClientUpdateMessage) clientMessage).getValue();
         if (logger.isTraceEnabled()) {
@@ -2775,7 +2799,7 @@ public class CacheClientProxy implements ClientSession {
 
         message = ((ClientUpdateMessageImpl) clientMessage).getMessage(getProxy(), latestValue);
 
-        if (AFTER_MESSAGE_CREATION_FLAG) {
+        if (afterMessageCreationForTesting) {
           ClientServerObserver bo = ClientServerObserverHolder.getInstance();
           bo.afterMessageCreation(message);
         }
@@ -2783,37 +2807,9 @@ public class CacheClientProxy implements ClientSession {
         message = clientMessage.getMessage(getProxy(), true /* notify */);
       }
 
-      // //////////////////////////////
-      // TEST CODE BEGIN (Throws exception to test closing proxy)
-      // if (true) throw new IOException("test");
-      // TEST CODE END
-      // //////////////////////////////
-      // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID,
-      // latestValue);
-      // Message message = clientMessage.getMessage(); removed during merge.
-      // BugFix for BUG#38206 and BUG#37791
       if (!this._proxy.isPaused()) {
         sendMessage(message);
 
-        // //////////////////////////////
-        // TEST CODE BEGIN (Throws exception to test closing proxy)
-        // if (true) throw new IOException("test");
-        // TEST CODE END
-        // //////////////////////////////
-        // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID,
-        // latestValue);
-        // Message message = clientMessage.getMessage(); removed during merge.
-        // message.setComms(getSocket(), getCommBuffer(), getStatistics());
-        // message.send();
-
-        // //////////////////////////////
-        // TEST CODE BEGIN (Introduces random wait in client)
-        // Sleep a random number of ms
-        // java.util.Random rand = new java.util.Random();
-        // try {Thread.sleep(rand.nextInt(5));} catch (InterruptedException e) {}
-        // TEST CODE END
-        // //////////////////////////////
-
         if (logger.isTraceEnabled()) {
           logger.trace("{}: Dispatched {}", this, clientMessage);
         }
@@ -2855,7 +2851,7 @@ public class CacheClientProxy implements ClientSession {
       try {
         this._messageQueue.put(clientMessage);
         if (this._proxy.isPaused() && this._proxy.isDurable()) {
-          this._proxy._cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount();
+          this._proxy.cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount();
           if (logger.isDebugEnabled()) {
             logger.debug("{}: Queued message while Durable Client is away {}", this, clientMessage);
           }
@@ -2959,7 +2955,7 @@ public class CacheClientProxy implements ClientSession {
       this._pausedLock.notifyAll();
     }
 
-    protected Object deserialize(byte[] serializedBytes) {
+    private Object deserialize(byte[] serializedBytes) {
       Object deserializedObject = serializedBytes;
       // This is a debugging method so ignore all exceptions like
       // ClassNotFoundException
@@ -2983,89 +2979,7 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
-  /**
-   * Returns the current number of CQS the client installed.
-   *
-   * @return int the current count of CQs for this client
-   */
-  public int getCqCount() {
-    synchronized (this) {
-      return this._statistics.getCqCount();
-    }
-  }
-
-  /**
-   * Increment the number of CQs the client installed
-   *
-   */
-  public void incCqCount() {
-    synchronized (this) {
-      this._statistics.incCqCount();
-    }
-  }
-
-  /**
-   * Decrement the number of CQs the client installed
-   *
-   */
-  public synchronized void decCqCount() {
-    synchronized (this) {
-      this._statistics.decCqCount();
-    }
-  }
-
-  /**
-   * Returns true if the client has one CQ
-   *
-   * @return true if the client has one CQ
-   */
-  public boolean hasOneCq() {
-    synchronized (this) {
-      return this._statistics.getCqCount() == 1;
-    }
-  }
-
-  /**
-   * Returns true if the client has no CQs
-   *
-   * @return true if the client has no CQs
-   */
-  public boolean hasNoCq() {
-    synchronized (this) {
-      return this._statistics.getCqCount() == 0;
-    }
-  }
-
-  /**
-   * Get map of regions with empty data policy
-   *
-   * @since GemFire 6.1
-   */
-  public Map getRegionsWithEmptyDataPolicy() {
-    return regionsWithEmptyDataPolicy;
-  }
-
-  public int incrementAndGetPingCounter() {
-    int pingCount = this.pingCounter.incrementAndGet();
-    return pingCount;
-  }
-
-  public void resetPingCounter() {
-    this.pingCounter.set(0);
-  }
-
-  /**
-   * Returns the number of seconds that have elapsed since the Client proxy created.
-   * 
-   * @since GemFire 7.0
-   */
-  public long getUpTime() {
-    return (long) ((System.currentTimeMillis() - this.creationDate.getTime()) / 1000);
-  }
-
   public interface TestHook {
-    public void doTestHook(String spot);
+    void doTestHook(String spot);
   }
-
-  public static TestHook testHook;
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index e21a834..6e8f9ce 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -22,7 +22,11 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.CacheClientStatus;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.IncomingGatewayStatus;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.concurrent.ConcurrentHashSet;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -32,7 +36,14 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.logging.log4j.Logger;
 
 import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
 /**

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
index 2cbf63b..46e43c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
@@ -34,11 +34,8 @@ import static org.apache.geode.distributed.ConfigurationProperties.*;
 
 /**
  * This class represents a ConnectionProxy of the CacheClient
- * 
- * 
- * 
  */
-public final class ClientProxyMembershipID
+public class ClientProxyMembershipID
     implements DataSerializableFixedID, Serializable, Externalizable {
 
   private static final Logger logger = LogService.getLogger();

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 6bbe7b8..7d1603d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -352,8 +352,8 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     byte[] serializedValue = null;
     Message message = null;
     boolean conflation = false;
-    conflation = (proxy.clientConflation == HandShake.CONFLATION_ON)
-        || (proxy.clientConflation == HandShake.CONFLATION_DEFAULT && this.shouldBeConflated());
+    conflation = (proxy.isClientConflationOn())
+        || (proxy.isClientConflationDefault() && this.shouldBeConflated());
 
     if (latestValue != null) {
       serializedValue = latestValue;

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
index 6e119c0..8a51c31 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
@@ -62,6 +62,7 @@ import org.apache.geode.CancelCriterion;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.GemFireConfigException;
 import org.apache.geode.InternalGemFireException;
+import org.apache.geode.LogWriter;
 import org.apache.geode.cache.GatewayConfigurationException;
 import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerRefusedConnectionException;
@@ -1669,8 +1670,8 @@ public class HandShake implements ClientHandShake {
    * not
    */
   public static Object verifyCredentials(String authenticatorMethod, Properties credentials,
-      Properties securityProperties, InternalLogWriter logWriter,
-      InternalLogWriter securityLogWriter, DistributedMember member)
+      Properties securityProperties, LogWriter logWriter, LogWriter securityLogWriter,
+      DistributedMember member)
       throws AuthenticationRequiredException, AuthenticationFailedException {
 
     if (!AcceptorImpl.isAuthenticationRequired()) {
@@ -1702,8 +1703,8 @@ public class HandShake implements ClientHandShake {
 
     String methodName = this.system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR);
     return verifyCredentials(methodName, this.credentials, this.system.getSecurityProperties(),
-        (InternalLogWriter) this.system.getLogWriter(),
-        (InternalLogWriter) this.system.getSecurityLogWriter(), this.id.getDistributedMember());
+        this.system.getLogWriter(), this.system.getSecurityLogWriter(),
+        this.id.getDistributedMember());
   }
 
   public void sendCredentialsForWan(OutputStream out, InputStream in) {
@@ -1731,8 +1732,7 @@ public class HandShake implements ClientHandShake {
     String authenticator = this.system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR);
     Properties peerWanProps = readCredentials(dis, dos, this.system);
     verifyCredentials(authenticator, peerWanProps, this.system.getSecurityProperties(),
-        (InternalLogWriter) this.system.getLogWriter(),
-        (InternalLogWriter) this.system.getSecurityLogWriter(), member);
+        this.system.getLogWriter(), this.system.getSecurityLogWriter(), member);
   }
 
   private static int getKeySize(String skAlgo) {

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
index 5a229d3..dfce317 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
@@ -151,6 +151,16 @@ public class LogService extends LogManager {
   }
 
   /**
+   * Returns a Logger with the name of @{link SECURITY_LOGGER_NAME}.
+   *
+   * @return The security Logger.
+   */
+  public static Logger getSecurityLogger() {
+    return new FastLogger(
+        LogManager.getLogger(SECURITY_LOGGER_NAME, GemFireParameterizedMessageFactory.INSTANCE));
+  }
+
+  /**
    * Returns a LogWriterLogger that is decorated with the LogWriter and LogWriterI18n methods.
    * <p>
    * This is the bridge to LogWriter and LogWriterI18n that we need to eventually stop using in

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
new file mode 100644
index 0000000..017e0f5
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed;
+
+import org.apache.geode.cache.Cache;
+
+/**
+ * Provides tests a way to access non-public state in ServerLauncher
+ */
+public class ServerLauncherUtils {
+
+  /**
+   * Returns the Cache from an online in-process ServerLauncher instance
+   */
+  public static Cache getCache(final ServerLauncher serverLauncher) {
+    return serverLauncher.getCache();
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
index 7aa11b7..8ff541b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
@@ -14,21 +14,14 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
-import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.client.ServerRefusedConnectionException;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.AvailablePort;
 import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.Acceptor;
-import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.junit.After;
@@ -39,7 +32,6 @@ import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.net.BindException;
-import java.net.Socket;
 import java.util.Collections;
 import java.util.Properties;
 
@@ -85,7 +77,8 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+            this.cache.getCancelCriterion());
         fail("Expected an IllegalArgumentExcption due to max conns < min pool size");
       } catch (IllegalArgumentException expected) {
       }
@@ -95,7 +88,7 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, 0,
             CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
             CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST,
-            CacheServer.DEFAULT_TCP_NO_DELAY);
+            CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion());
         fail("Expected an IllegalArgumentExcption due to max conns of zero");
       } catch (IllegalArgumentException expected) {
       }
@@ -105,12 +98,14 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+            this.cache.getCancelCriterion());
         a2 = new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+            this.cache.getCancelCriterion());
         fail("Expecetd a BindException while attaching to the same port");
       } catch (BindException expected) {
       }
@@ -119,7 +114,8 @@ public class AcceptorImplJUnitTest {
           CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
           AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
           CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
-          null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+          null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+          this.cache.getCancelCriterion());
       assertEquals(port2, a3.getPort());
       InternalDistributedSystem isystem =
           (InternalDistributedSystem) this.cache.getDistributedSystem();

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
new file mode 100644
index 0000000..7ab539d
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.wan.GatewayTransportFilter;
+import org.apache.geode.distributed.internal.DistributionConfigImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+@Category(UnitTest.class)
+public class AcceptorImplTest {
+
+  @Before
+  public void before() throws Exception {
+    DistributionConfigImpl distributionConfig = new DistributionConfigImpl(new Properties());
+    SocketCreatorFactory.setDistributionConfig(distributionConfig);
+  }
+
+  @After
+  public void after() throws Exception {
+    SocketCreatorFactory.close();
+  }
+
+  @Test
+  public void constructWithDefaults() throws Exception {
+    /*
+     * Problems:
+     * 
+     * this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats, maximumMessageCount,
+     * messageTimeToLive, connectionListener, overflowAttributesList, isGatewayReceiver);
+     * 
+     * this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
+     * this.clientNotifier.getStats());
+     * 
+     * LoggingThreadGroup / ThreadFactory / ThreadPoolExecutor
+     * 
+     * isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+     * 
+     * isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+     * 
+     * 
+     * String postAuthzFactoryName =
+     * this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+     * 
+     */
+
+    int port = 0;
+    String bindHostName = SocketCreator.getLocalHost().getHostName();
+    boolean notifyBySubscription = false;
+    int socketBufferSize = 1;
+    int maximumTimeBetweenPings = 0;
+    InternalCache internalCache = null;
+    int maxConnections = 0;
+    int maxThreads = 0;
+    int maximumMessageCount = 0;
+    int messageTimeToLive = 0;
+    ConnectionListener listener = null;
+    List overflowAttributesList = null;
+    boolean isGatewayReceiver = false;
+    List<GatewayTransportFilter> transportFilter = Collections.emptyList();
+    boolean tcpNoDelay = false;
+    CancelCriterion cancelCriterion = null;
+
+    AcceptorImpl acceptor = new AcceptorImpl(port, bindHostName, notifyBySubscription,
+        socketBufferSize, maximumTimeBetweenPings, internalCache, maxConnections, maxThreads,
+        maximumMessageCount, messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver,
+        transportFilter, tcpNoDelay, cancelCriterion);
+
+    assertThat(acceptor).isNotNull();
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
new file mode 100644
index 0000000..8cd7622
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.CacheServerImpl;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Provides tests a way to access CacheServer, AcceptorImpl and ServerConnection
+ */
+public class CacheServerUtils {
+
+  /**
+   * Returns single CacheServer for the specified Cache instance
+   */
+  public static CacheServer getCacheServer(final Cache cache) {
+    List<CacheServer> cacheServers = cache.getCacheServers();
+    CacheServer cacheServer = cacheServers.get(0);
+    return cacheServer;
+  }
+
+  /**
+   * Returns AcceptorImpl for the specified CacheServer instance
+   */
+  public static AcceptorImpl getAcceptorImpl(final CacheServer cacheServer) {
+    AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+    return acceptor;
+  }
+
+  /**
+   * Returns single ServerConnection for the specified CacheServer instance
+   */
+  public static ServerConnection getServerConnection(final CacheServer cacheServer) {
+    AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+    Set<ServerConnection> serverConnections = acceptor.getAllServerConnections();
+    ServerConnection serverConnection = serverConnections.iterator().next(); // null
+    return serverConnection;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
index 31f67aa..f4a8cc8 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
@@ -99,7 +99,7 @@ public class ClientConflationDUnitTest extends JUnit4DistributedTestCase {
    *
    */
   public static void setIsSlowStart() {
-    CacheClientProxy.isSlowStartForTesting = true;
+    CacheClientProxy.setSlowStartForTesting();
     System.setProperty("slowStartTimeForTesting", "15000");
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
index 1a76daa..efc0367 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
@@ -166,7 +166,7 @@ public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase {
   }
 
   private static void installObserver() {
-    CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = true;
+    CacheClientProxy.setAfterMessageCreationForTesting();
     ClientServerObserverHolder.setInstance(new DelaySendingEvent());
   }
 
@@ -176,7 +176,7 @@ public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase {
   }
 
   private static void cleanupObserver() {
-    CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = false;
+    CacheClientProxy.unsetAfterMessageCreationForTesting();
     ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter());
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
index b4f3185..43330a5 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
@@ -62,13 +62,9 @@ import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.standalone.VersionManager;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.Properties;
 import java.util.Set;
@@ -1014,7 +1010,8 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
       while (iter_prox.hasNext()) {
         CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
         // CCP should not contain region1
-        Set akr = ccp.cils[RegisterInterestTracker.interestListIndex].regions;
+        Set akr = ccp
+            .getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex].regions;
         assertNotNull(akr);
         assertTrue(!akr.contains(Region.SEPARATOR + REGION_NAME1));
         // CCP should contain region2
@@ -1352,7 +1349,7 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
    *
    */
   public static void unsetSlowDispatcherFlag() {
-    CacheClientProxy.isSlowStartForTesting = false;
+    CacheClientProxy.unsetSlowStartForTesting();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
index b1e16ee..275e458 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
@@ -112,7 +112,7 @@ public class ConflationDUnitTest extends JUnit4DistributedTestCase {
    * 
    */
   public static void setIsSlowStart(String milis) {
-    CacheClientProxy.isSlowStartForTesting = true;
+    CacheClientProxy.setSlowStartForTesting();
     System.setProperty("slowStartTimeForTesting", milis);
   }
 
@@ -121,7 +121,7 @@ public class ConflationDUnitTest extends JUnit4DistributedTestCase {
    *
    */
   public static void unsetIsSlowStart() {
-    CacheClientProxy.isSlowStartForTesting = false;
+    CacheClientProxy.unsetSlowStartForTesting();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
index 544f732..9d60cc7 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
@@ -459,7 +459,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
         iter_prox = ccn.getClientProxies().iterator();
         if (iter_prox.hasNext()) {
           proxy = (CacheClientProxy) iter_prox.next();
-          return proxy._messageDispatcher.isAlive();
+          return proxy.getMessageDispatcherForTesting().isAlive();
         } else {
           return false;
         }
@@ -510,7 +510,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
     if (iter_prox.hasNext()) {
       CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
       assertFalse("Dispatcher on secondary should not be alive",
-          proxy._messageDispatcher.isAlive());
+          proxy.getMessageDispatcherForTesting().isAlive());
     }
   }
 
@@ -818,8 +818,10 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       wc = new WaitCriterion() {
         @Override
         public boolean done() {
-          Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
-              .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+          Set keysMap =
+              (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+                  .getProfile(Region.SEPARATOR + REGION_NAME)
+                  .getKeysOfInterestFor(ccp.getProxyID());
           return keysMap != null && keysMap.size() == 2;
         }
 
@@ -830,8 +832,9 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       };
       Wait.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
 
-      Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
-          .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+      Set keysMap =
+          (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+              .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
       assertNotNull(keysMap);
       assertEquals(2, keysMap.size());
       assertTrue(keysMap.contains(k1));
@@ -879,8 +882,10 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       wc = new WaitCriterion() {
         @Override
         public boolean done() {
-          Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
-              .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+          Set keysMap =
+              (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+                  .getProfile(Region.SEPARATOR + REGION_NAME)
+                  .getKeysOfInterestFor(ccp.getProxyID());
           return keysMap != null;
         }
 
@@ -891,8 +896,9 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
       };
       Wait.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
 
-      Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
-          .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+      Set keysMap =
+          (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+              .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
       assertNotNull(keysMap);
       assertEquals(1, keysMap.size());
       assertFalse(keysMap.contains(k1));

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
index 6aea509..3585c3e 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
@@ -471,7 +471,7 @@ public class HAStartupAndFailoverDUnitTest extends JUnit4DistributedTestCase {
           String excuse;
 
           public boolean done() {
-            return proxy._messageDispatcher.isAlive();
+            return proxy.getMessageDispatcherForTesting().isAlive();
           }
 
           public String description() {
@@ -529,7 +529,7 @@ public class HAStartupAndFailoverDUnitTest extends JUnit4DistributedTestCase {
       if (iter_prox.hasNext()) {
         CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
         assertFalse("Dispatcher on secondary should not be alive",
-            proxy._messageDispatcher.isAlive());
+            proxy.getMessageDispatcherForTesting().isAlive());
       }
     } catch (Exception ex) {
       fail("while setting verifyDispatcherIsNotAlive  " + ex);

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
index 041cd38..be9265b 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
@@ -434,8 +434,8 @@ public class InterestListRecoveryDUnitTest extends JUnit4DistributedTestCase {
   public static Set getKeysOfInterestMap(CacheClientProxy proxy, String regionName) {
     // assertNotNull(proxy.cils[RegisterInterestTracker.interestListIndex]);
     // assertNotNull(proxy.cils[RegisterInterestTracker.interestListIndex]._keysOfInterest);
-    return proxy.cils[RegisterInterestTracker.interestListIndex].getProfile(regionName)
-        .getKeysOfInterestFor(proxy.getProxyID());
+    return proxy.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+        .getProfile(regionName).getKeysOfInterestFor(proxy.getProxyID());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
index 4a98298..1635fca 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
@@ -189,10 +189,10 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
           String excuse;
 
           public boolean done() {
-            if (proxy._messageDispatcher == null) {
+            if (proxy.getMessageDispatcherForTesting() == null) {
               return false;
             }
-            return proxy._messageDispatcher.isAlive();
+            return proxy.getMessageDispatcherForTesting().isAlive();
           }
 
           public String description() {
@@ -245,7 +245,7 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
       if (iter_prox.hasNext()) {
         CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
         assertFalse("Dispatcher on secondary should not be alive",
-            proxy._messageDispatcher.isAlive());
+            proxy.getMessageDispatcherForTesting().isAlive());
       }
 
     } catch (Exception ex) {
@@ -427,8 +427,10 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
           String excuse;
 
           public boolean done() {
-            Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
-                .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+            Set keysMap = (Set) ccp
+                .getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+                    .getProfile(Region.SEPARATOR + REGION_NAME)
+                    .getKeysOfInterestFor(ccp.getProxyID());
             if (keysMap == null) {
               excuse = "keys of interest is null";
               return false;
@@ -446,8 +448,9 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
         };
         Wait.waitForCriterion(wc, 180 * 1000, 2 * 1000, true);
 
-        Set keysMap = ccp.cils[RegisterInterestTracker.interestListIndex]
-            .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+        Set keysMap =
+            ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+                .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
         assertTrue(keysMap.contains(k1));
         assertTrue(keysMap.contains(k2));
 

http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
new file mode 100644
index 0000000..2d900dc
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.distributed.AbstractLauncher.Status.ONLINE;
+import static org.apache.geode.distributed.ServerLauncherUtils.*;
+import static org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.DEFAULT_HANDSHAKE_TIMEOUT_MS;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerUtils.*;
+import static org.apache.geode.internal.AvailablePort.*;
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.Socket;
+
+@Category(IntegrationTest.class)
+public class ExperimentIntegrationTest {
+
+  private ServerLauncher serverLauncher;
+  private ServerConnection serverConnection;
+
+  @Before
+  public void before() throws Exception {
+    int serverPort = getRandomAvailablePort(SOCKET);
+
+    this.serverLauncher =
+        new ServerLauncher.Builder().setMemberName("server").setServerPort(serverPort).build();
+    this.serverLauncher.start();
+
+    Cache cache = getCache(this.serverLauncher);
+    CacheServer cacheServer = getCacheServer(cache);
+    AcceptorImpl acceptor = getAcceptorImpl(cacheServer);
+
+    Socket mockSocket = mock(Socket.class);
+    when(mockSocket.getInetAddress()).thenReturn(SocketCreator.getLocalHost());
+
+    this.serverConnection =
+        new ServerConnection(mockSocket, cache, null, null, DEFAULT_HANDSHAKE_TIMEOUT_MS,
+            CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, "client", Acceptor.CLIENT_TO_SERVER, acceptor);
+
+    preConditions();
+  }
+
+  public void preConditions() throws Exception {
+    assertThat(this.serverLauncher.status().getStatus()).isEqualTo(ONLINE);
+  }
+
+  @Test
+  public void handlePutFromFakeClient() throws Exception {
+    Message message = mock(Message.class);
+    Command command = mock(Command.class);
+    command.execute(message, this.serverConnection);
+  }
+
+}


Mime
View raw message