geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [28/35] geode git commit: GEODE-2632: refactoring preparations for SecurityService and BaseCommand changes
Date Wed, 31 May 2017 23:15:27 GMT
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index 291db65..8915c55 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -94,9 +94,9 @@ import org.apache.geode.security.AuthenticationRequiredException;
 import org.apache.geode.security.GemFireSecurityException;
 
 /**
- * <code>CacheClientUpdater</code> is a thread that processes update messages from a cache server
- * and {@linkplain org.apache.geode.cache.Region#localInvalidate(Object) invalidates} the local
- * cache based on the contents of those messages.
+ * {@code CacheClientUpdater} is a thread that processes update messages from a cache server and
+ * {@linkplain org.apache.geode.cache.Region#localInvalidate(Object) invalidates} the local cache
+ * based on the contents of those messages.
  * 
  * @since GemFire 3.5
  */
@@ -104,6 +104,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
 
   private static final Logger logger = LogService.getLogger();
 
+  private static final int DEFAULT_SOCKET_BUFFER_SIZE = 32768;
+
   /**
    * true if the constructor successfully created a connection. If false, the run method for this
    * thread immediately exits.
@@ -129,6 +131,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * The input stream of the socket
    */
   private final InputStream in;
+
   /**
    * Failed updater from the endpoint previously known as the primary
    */
@@ -139,12 +142,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    */
   private final ByteBuffer commBuffer;
 
-  private boolean commBufferReleased;
+  private boolean commBufferReleased; // TODO: fix synchronization
 
   private final CCUStats stats;
 
   /**
-   * Cache for which we provide service
+   * Cache for which we provide service TODO: lifecycle and synchronization need work
    */
   private /* final */ InternalCache cache;
 
@@ -175,18 +178,18 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    */
   private boolean isOpCompleted;
 
-  public final static String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread ";
+  public static final String CLIENT_UPDATER_THREAD_NAME = "Cache Client Updater Thread ";
 
   /**
-   * to enable test flag
+   * to enable test flag TODO: eliminate isUsedByTest
    */
   public static boolean isUsedByTest;
 
   /**
    * Indicates if full value was requested from server as a result of failure in applying delta
-   * bytes.
+   * bytes. TODO: only used for test assertion
    */
-  public static boolean fullValueRequested = false;
+  static boolean fullValueRequested = false;
 
   private final ServerLocation location;
 
@@ -195,8 +198,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   private EndpointManager eManager = null;
   private Endpoint endpoint = null;
 
-  static private final long MAX_CACHE_WAIT = Long
-      .getLong(DistributionConfig.GEMFIRE_PREFIX + "CacheClientUpdater.MAX_WAIT", 120).longValue(); // seconds
+  private static final long MAX_CACHE_WAIT =
+      Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "CacheClientUpdater.MAX_WAIT", 120); // seconds
 
   /**
    * Return true if cache appears
@@ -231,7 +234,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       boolean interrupted = Thread.interrupted();
       try {
         Thread.sleep(1000);
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignore) {
         interrupted = true;
       } finally {
         if (interrupted) {
@@ -245,12 +248,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   }
 
   /**
-   * Creates a new <code>CacheClientUpdater</code> with a given name that waits for a server to
-   * connect on a given port.
+   * Creates a new {@code CacheClientUpdater} with a given name that waits for a server to connect
+   * on a given port.
    *
    * @param name descriptive name, used for our ThreadGroup
    * @param location the endpoint we represent
-   * @param primary true if our endpoint is primary TODO ask the ep for this?
+   * @param primary true if our endpoint is primary
    * @param ids the system we are distributing messages through
    * 
    * @throws AuthenticationRequiredException when client is not configured to send credentials using
@@ -265,6 +268,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       Endpoint endpoint, int handshakeTimeout, SocketCreator socketCreator)
       throws AuthenticationRequiredException, AuthenticationFailedException,
       ServerRefusedConnectionException {
+
     super(LoggingThreadGroup.createThreadGroup("Client update thread"), name);
     this.setDaemon(true);
     this.system = (InternalDistributedSystem) ids;
@@ -276,6 +280,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     this.eManager = eManager;
     this.endpoint = endpoint;
     this.stats = new CCUStats(this.system, this.location);
+
     // Create the connection...
     final boolean isDebugEnabled = logger.isDebugEnabled();
     if (isDebugEnabled) {
@@ -291,7 +296,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     try {
       // Size of the server-to-client communication socket buffers
       int socketBufferSize =
-          Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768).intValue();
+          Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", DEFAULT_SOCKET_BUFFER_SIZE);
 
       mySock = socketCreator.connectForClient(location.getHostName(), location.getPort(),
           handshakeTimeout, socketBufferSize);
@@ -327,31 +332,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         }
       }
 
-      {
-        int bufSize = 1024;
-        try {
-          bufSize = mySock.getSendBufferSize();
-          if (bufSize < 1024) {
-            bufSize = 1024;
-          }
-        } catch (SocketException ignore) {
+      int bufSize = 1024;
+      try {
+        bufSize = mySock.getSendBufferSize();
+        if (bufSize < 1024) {
+          bufSize = 1024;
         }
-        cb = ServerConnection.allocateCommBuffer(bufSize, mySock);
-      }
-      {
-        // create a "server" memberId we currently don't know much about the
-        // server.
-        // Would be nice for it to send us its member id
-        // TODO: change the serverId to use the endpoint's getMemberId() which
-        // returns a
-        // DistributedMember (once gfecq branch is merged to trunk).
-        MemberAttributes ma =
-            new MemberAttributes(0, -1, DistributionManager.NORMAL_DM_TYPE, -1, null, null, null);
-        sid = new InternalDistributedMember(mySock.getInetAddress(), mySock.getPort(), false, true,
-            ma);
+      } catch (SocketException ignore) {
       }
+      cb = ServerConnection.allocateCommBuffer(bufSize, mySock);
+
+      // create a "server" memberId we currently don't know much about the server.
+      // Would be nice for it to send us its member id
+      // TODO: change the serverId to use the endpoint's getMemberId() which returns a
+      // DistributedMember (once gfecq branch is merged to trunk).
+      MemberAttributes ma =
+          new MemberAttributes(0, -1, DistributionManager.NORMAL_DM_TYPE, -1, null, null, null);
+      sid =
+          new InternalDistributedMember(mySock.getInetAddress(), mySock.getPort(), false, true, ma);
+
       success = true;
-    } catch (ConnectException e) {
+    } catch (ConnectException ignore) {
       if (!quitting()) {
         logger.warn(LocalizedMessage
             .create(LocalizedStrings.CacheClientUpdater_0_CONNECTION_WAS_REFUSED, this));
@@ -385,20 +386,22 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
             e.getMessage()));
       }
     } finally {
-      connected = success;
+      this.connected = success;
       if (mySock != null) {
         try {
           mySock.setSoTimeout(0);
-        } catch (SocketException e) {
+        } catch (SocketException ignore) {
           // ignore: nothing we can do about this
         }
       }
-      if (connected) {
-        socket = mySock;
-        out = tmpOut;
-        in = tmpIn;
-        serverId = sid;
-        commBuffer = cb;
+
+      if (this.connected) {
+        this.socket = mySock;
+        this.out = tmpOut;
+        this.in = tmpIn;
+        this.serverId = sid;
+        this.commBuffer = cb;
+
         // Don't want the timeout after handshake
         if (mySock != null) {
           try {
@@ -406,12 +409,13 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           } catch (SocketException ignore) {
           }
         }
+
       } else {
-        socket = null;
-        serverId = null;
-        commBuffer = null;
-        out = null;
-        in = null;
+        this.socket = null;
+        this.serverId = null;
+        this.commBuffer = null;
+        this.out = null;
+        this.in = null;
 
         if (mySock != null) {
           try {
@@ -439,29 +443,31 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   }
 
   public boolean isConnected() {
-    return connected;
+    return this.connected;
   }
 
+  @Override
   public boolean isPrimary() {
-    return isPrimary;
+    return this.isPrimary;
   }
 
   public InternalLogWriter getSecurityLogger() {
     return this.qManager.getSecurityLogger();
   }
 
+  @Override
   public void setFailedUpdater(ClientUpdater failedUpdater) {
     this.failedUpdater = failedUpdater;
   }
 
   /**
-   * Performs the work of the client update thread. Creates a <code>ServerSocket</code> and waits
-   * for the server to connect to it.
+   * Performs the work of the client update thread. Creates a {@code ServerSocket} and waits for the
+   * server to connect to it.
    */
   @Override
   public void run() {
+    EntryLogger.setSource(this.serverId, "RI");
     boolean addedListener = false;
-    EntryLogger.setSource(serverId, "RI");
     try {
       this.system.addDisconnectListener(this);
       addedListener = true;
@@ -472,8 +478,10 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         return;
       }
       processMessages();
-    } catch (CancelException e) {
-      return; // just bail
+
+    } catch (CancelException ignore) {
+      // just bail
+
     } finally {
       if (addedListener) {
         this.system.removeDisconnectListener(this);
@@ -486,8 +494,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Notifies this thread to stop processing
    */
-  protected void stopProcessing() {
-    continueProcessing.set(false);// = false;
+  private void stopProcessing() {
+    this.continueProcessing.set(false);
   }
 
   /**
@@ -495,39 +503,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * duplicates. Note: this method is not named stop because this is a Thread which has a deprecated
    * stop method.
    */
-  public void stopUpdater() {
+  private void stopUpdater() {
     boolean isSelfDestroying = Thread.currentThread() == this;
-
     stopProcessing();
+
     // need to also close the socket for this interrupt to wakeup
     // the thread. This fixes bug 35691.
-    // this.close(); // this should not be done here.
 
     if (this.isAlive()) {
       if (logger.isDebugEnabled()) {
         logger.debug("{}: Stopping {}", this.location, this);
       }
+
       if (!isSelfDestroying) {
         interrupt();
         try {
-          if (socket != null) {
-            socket.close();
+          if (this.socket != null) {
+            this.socket.close();
           }
-        } catch (VirtualMachineError err) {
-          SystemFailure.initiateFailure(err);
-          // If this ever returns, rethrow the error. We're poisoned
-          // now, so don't let this thread continue.
-          throw err;
-        } catch (Throwable t) {
-          // Whenever you catch Error or Throwable, you must also
-          // catch VirtualMachineError (see above). However, there is
-          // _still_ a possibility that you are dealing with a cascading
-          // error condition, so you also need to check to see if the JVM
-          // is still usable:
-          SystemFailure.checkFailure();
-          // dont care...
+        } catch (IOException e) {
           if (logger.isDebugEnabled()) {
-            logger.debug(t.getMessage(), t);
+            logger.debug(e.getMessage(), e);
           }
         }
       } // !isSelfDestroying
@@ -537,32 +533,24 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Signals the run thread to stop, closes underlying resources.
    */
+  @Override
   public void close() {
-    this.continueProcessing.set(false);// = false; // signals we are done.
+    this.continueProcessing.set(false); // signals we are done.
 
-    // Close the socket
-    // This will also cause the underlying streams to fail.
+    // Close the socket. This will also cause the underlying streams to fail.
     try {
-      if (socket != null) {
-        socket.close();
+      if (this.socket != null) {
+        this.socket.close();
       }
-    } catch (Exception e) {
+    } catch (IOException ignore) {
       // ignore
     }
 
-    try {
-      this.stats.close();
-    } catch (Exception e) {
-      // ignore
-    }
+    this.stats.close();
 
     // close the helper
-    try {
-      if (cacheHelper != null) {
-        cacheHelper.close();
-      }
-    } catch (Exception e) {
-      // ignore
+    if (this.cacheHelper != null) {
+      this.cacheHelper.close();
     }
     releaseCommBuffer();
   }
@@ -572,41 +560,32 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * the server.
    */
   private Message initializeMessage() {
-    Message _message = new Message(2, Version.CURRENT);
-    try {
-      _message.setComms(socket, in, out, commBuffer, this.stats);
-    } catch (IOException e) {
-      if (!quitting()) {
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "{}: Caught following exception while attempting to initialize a server-to-client communication socket and will exit",
-              this, e);
-        }
-        stopProcessing();
-      }
-    }
-    return _message;
+    Message message = new Message(2, Version.CURRENT);
+    message.setComms(this.socket, this.in, this.out, this.commBuffer, this.stats);
+    return message;
   }
 
   /* refinement of method inherited from Thread */
   @Override
   public String toString() {
-    return this.getName() + " (" + this.location.getHostName() + ":" + this.location.getPort()
-        + ")";
+    return getName() + " (" + this.location.getHostName() + ':' + this.location.getPort() + ')';
   }
 
   /**
    * Handle a marker message
    * 
-   * @param m message containing the data
+   * @param clientMessage message containing the data
    */
-  private void handleMarker(Message m) {
+  private void handleMarker(Message clientMessage) {
     try {
       final boolean isDebugEnabled = logger.isDebugEnabled();
       if (isDebugEnabled) {
-        logger.debug("Received marker message of length ({} bytes)", m.getPayloadLength());
+        logger.debug("Received marker message of length ({} bytes)",
+            clientMessage.getPayloadLength());
       }
+
       this.qManager.getState().processMarker();
+
       if (isDebugEnabled) {
         logger.debug("Processed marker message");
       }
@@ -621,41 +600,40 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Create or update an entry
    * 
-   * @param m message containing the data
+   * @param clientMessage message containing the data
    */
-  private void handleUpdate(Message m) {
+  private void handleUpdate(Message clientMessage) {
     String regionName = null;
     Object key = null;
     Part valuePart = null;
-    Object newValue = null;
-    byte[] deltaBytes = null;
-    Object fullValue = null;
-    boolean isValueObject = false;
-    int partCnt = 0;
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       this.isOpCompleted = false;
+
       // Retrieve the data from the put message parts
       if (isDebugEnabled) {
-        logger.debug("Received put message of length ({} bytes)", m.getPayloadLength());
+        logger.debug("Received put message of length ({} bytes)", clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
-      Part keyPart = m.getPart(partCnt++);
-      boolean isDeltaSent = ((Boolean) m.getPart(partCnt++).getObject()).booleanValue();
-      valuePart = m.getPart(partCnt++);
-      Part callbackArgumentPart = m.getPart(partCnt++);
-      VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject();
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part keyPart = clientMessage.getPart(partCnt++);
+      boolean isDeltaSent = (Boolean) clientMessage.getPart(partCnt++).getObject();
+      valuePart = clientMessage.getPart(partCnt++);
+      Part callbackArgumentPart = clientMessage.getPart(partCnt++);
+      VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject();
       if (versionTag != null) {
         versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
       }
-      Part isInterestListPassedPart = m.getPart(partCnt++);
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part isInterestListPassedPart = clientMessage.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
-      EventID eventId = (EventID) m.getPart(m.getNumberOfParts() - 1).getObject();
+      EventID eventId =
+          (EventID) clientMessage.getPart(clientMessage.getNumberOfParts() - 1).getObject();
 
-      boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue();
-      boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue();
+      boolean withInterest = (Boolean) isInterestListPassedPart.getObject();
+      boolean withCQs = (Boolean) hasCqsPart.getObject();
 
       regionName = regionNamePart.getString();
       key = keyPart.getStringOrObject();
@@ -666,30 +644,39 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       // object, it will be stored as a CachedDeserializable and
       // deserialized only when requested.
 
-      boolean isCreate = (m.getMessageType() == MessageType.LOCAL_CREATE);
+      boolean isCreate = clientMessage.getMessageType() == MessageType.LOCAL_CREATE;
+
       if (isDebugEnabled) {
-        logger
-            .debug(
-                "Putting entry for region: {} key: {} create: {}{} callbackArgument: {} withInterest={} withCQs={} eventID={} version={}",
-                regionName, key, isCreate,
-                (valuePart.isObject() ? new StringBuilder(" value: ")
-                    .append(deserialize(valuePart.getSerializedForm())) : ""),
-                callbackArgument, withInterest, withCQs, eventId, versionTag);
+        logger.debug(
+            "Putting entry for region: {} key: {} create: {}{} callbackArgument: {} withInterest={} withCQs={} eventID={} version={}",
+            regionName, key, isCreate,
+            valuePart.isObject()
+                ? new StringBuilder(" value: ").append(deserialize(valuePart.getSerializedForm()))
+                : "",
+            callbackArgument, withInterest, withCQs, eventId, versionTag);
       }
 
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
+
+      Object newValue = null;
+      byte[] deltaBytes = null;
+      Object fullValue = null;
+      boolean isValueObject;
 
       if (!isDeltaSent) {
         // bug #42162 - must check for a serialized null here
         byte[] serializedForm = valuePart.getSerializedForm();
+
         if (isCreate && InternalDataSerializer.isSerializedNull(serializedForm)) {
           // newValue = null; newValue is already null
         } else {
           newValue = valuePart.getSerializedForm();
         }
+
         if (withCQs) {
           fullValue = valuePart.getObject();
         }
+
         isValueObject = valuePart.isObject();
       } else {
         deltaBytes = valuePart.getSerializedForm();
@@ -700,40 +687,49 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         if (isDebugEnabled && !quitting()) {
           logger.debug("{}: Region named {} does not exist", this, regionName);
         }
+
       } else if (region.hasServerProxy() && ServerResponseMatrix
-          .checkForValidStateAfterNotification(region, key, m.getMessageType())
+          .checkForValidStateAfterNotification(region, key, clientMessage.getMessageType())
           && (withInterest || !withCQs)) {
         @Released
         EntryEventImpl newEvent = null;
+
         try {
           // Create an event and put the entry
           newEvent = EntryEventImpl.create(region,
-              ((m.getMessageType() == MessageType.LOCAL_CREATE) ? Operation.CREATE
-                  : Operation.UPDATE),
+              clientMessage.getMessageType() == MessageType.LOCAL_CREATE ? Operation.CREATE
+                  : Operation.UPDATE,
               key, null /* newValue */, callbackArgument /* callbackArg */, true /* originRemote */,
               eventId.getDistributedMember());
+
           newEvent.setVersionTag(versionTag);
           newEvent.setFromServer(true);
+
           region.basicBridgeClientUpdate(eventId.getDistributedMember(), key, newValue, deltaBytes,
-              isValueObject, callbackArgument, m.getMessageType() == MessageType.LOCAL_CREATE,
-              qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent, eventId);
+              isValueObject, callbackArgument,
+              clientMessage.getMessageType() == MessageType.LOCAL_CREATE,
+              this.qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent,
+              eventId);
+
           this.isOpCompleted = true;
+
           // bug 45520 - ConcurrentCacheModificationException is not thrown and we must check this
           // flag
-          // if (newEvent.isConcurrencyConflict()) {
-          // return; // this is logged elsewhere at fine level
-          // }
           if (withCQs && isDeltaSent) {
             fullValue = newEvent.getNewValue();
           }
-        } catch (InvalidDeltaException ide) {
+        } catch (InvalidDeltaException ignore) {
           Part fullValuePart = requestFullValue(eventId, "Caught InvalidDeltaException.");
           region.getCachePerfStats().incDeltaFullValuesRequested();
-          fullValue = newValue = fullValuePart.getObject();
-          isValueObject = Boolean.valueOf(fullValuePart.isObject());
+          fullValue = newValue = fullValuePart.getObject(); // TODO: fix this line
+          isValueObject = fullValuePart.isObject();
+
           region.basicBridgeClientUpdate(eventId.getDistributedMember(), key, newValue, null,
-              isValueObject, callbackArgument, m.getMessageType() == MessageType.LOCAL_CREATE,
-              qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent, eventId);
+              isValueObject, callbackArgument,
+              clientMessage.getMessageType() == MessageType.LOCAL_CREATE,
+              this.qManager.getState().getProcessedMarker() || !this.isDurableClient, newEvent,
+              eventId);
+
           this.isOpCompleted = true;
         } finally {
           if (newEvent != null)
@@ -748,20 +744,19 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
 
       // Update CQs. CQs can exist without client region.
       if (withCQs) {
-        Part numCqsPart = m.getPart(partCnt++);
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), key, fullValue,
-            deltaBytes, eventId);
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+            clientMessage.getMessageType(), key, fullValue, deltaBytes, eventId);
         this.isOpCompleted = true;
       }
     } catch (Exception e) {
       String message =
           LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_PUT_ENTRY_REGION_0_KEY_1_VALUE_2
-              .toLocalizedString(
-                  new Object[] {regionName, key, deserialize(valuePart.getSerializedForm())});
+              .toLocalizedString(regionName, key, deserialize(valuePart.getSerializedForm()));
       handleException(message, e);
     }
   }
@@ -774,12 +769,14 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     if (isDebugEnabled) {
       logger.debug("{} Requesting full value...", reason);
     }
-    Part result = (Part) GetEventValueOp.executeOnPrimary(qManager.getPool(), eventId, null);
+    Part result = (Part) GetEventValueOp.executeOnPrimary(this.qManager.getPool(), eventId, null);
 
     if (result == null) {
       // Just log a warning. Do not stop CCU thread.
+      // TODO: throw a subclass of Exception
       throw new Exception("Could not retrieve full value for " + eventId);
     }
+
     if (isDebugEnabled) {
       logger.debug("Full value received.");
     }
@@ -789,39 +786,41 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Invalidate an entry
    * 
-   * @param m message describing the entry
+   * @param clientMessage message describing the entry
    */
-  private void handleInvalidate(Message m) {
+  private void handleInvalidate(Message clientMessage) {
     String regionName = null;
     Object key = null;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       this.isOpCompleted = false;
+
       // Retrieve the data from the local-invalidate message parts
       if (isDebugEnabled) {
-        logger.debug("Received invalidate message of length ({} bytes)", m.getPayloadLength());
+        logger.debug("Received invalidate message of length ({} bytes)",
+            clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
-      Part keyPart = m.getPart(partCnt++);
-      Part callbackArgumentPart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part keyPart = clientMessage.getPart(partCnt++);
+      Part callbackArgumentPart = clientMessage.getPart(partCnt++);
 
-      VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject();
+      VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject();
       if (versionTag != null) {
         versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
       }
 
-      Part isInterestListPassedPart = m.getPart(partCnt++);
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part isInterestListPassedPart = clientMessage.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
       regionName = regionNamePart.getString();
       key = keyPart.getStringOrObject();
 
       Object callbackArgument = callbackArgumentPart.getObject();
-      boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue();
-      boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue();
+      boolean withInterest = (Boolean) isInterestListPassedPart.getObject();
+      boolean withCQs = (Boolean) hasCqsPart.getObject();
 
       if (isDebugEnabled) {
         logger.debug(
@@ -829,34 +828,36 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
             regionName, key, callbackArgument, withInterest, withCQs, versionTag);
       }
 
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("Region named {} does not exist", regionName);
         }
+
       } else {
         if (region.hasServerProxy() && (withInterest || !withCQs)) {
           try {
-            Part eid = m.getPart(m.getNumberOfParts() - 1);
+            Part eid = clientMessage.getPart(clientMessage.getNumberOfParts() - 1);
             EventID eventId = (EventID) eid.getObject();
+
             try {
               region.basicBridgeClientInvalidate(eventId.getDistributedMember(), key,
                   callbackArgument,
-                  qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
+                  this.qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
                   versionTag);
-            } catch (ConcurrentCacheModificationException e) {
-              // return; allow CQs to be processed
+            } catch (ConcurrentCacheModificationException ignore) {
+              // allow CQs to be processed
             }
+
             this.isOpCompleted = true;
             // fix for 36615
-            qManager.getState().incrementInvalidatedStats();
+            this.qManager.getState().incrementInvalidatedStats();
 
             if (isDebugEnabled) {
               logger.debug("Invalidated entry for region: {} key: {} callbackArgument: {}",
                   regionName, key, callbackArgument);
             }
-          } catch (EntryNotFoundException e) {
-            /* ignore */
+          } catch (EntryNotFoundException ignore) {
             if (isDebugEnabled && !quitting()) {
               logger.debug("Already invalidated entry for region: {} key: {} callbackArgument: {}",
                   regionName, key, callbackArgument);
@@ -869,19 +870,20 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       if (withCQs) {
         // The client may have been registered to receive invalidates for
         // create and updates operations. Get the actual region operation.
-        Part regionOpType = m.getPart(partCnt++);
-        Part numCqsPart = m.getPart(partCnt++);
+        Part regionOpType = clientMessage.getPart(partCnt++);
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), regionOpType.getInt(), key, null);
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(), regionOpType.getInt(),
+            key, null);
         this.isOpCompleted = true;
       }
     } catch (Exception e) {
       final String message =
           LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_INVALIDATE_ENTRY_REGION_0_KEY_1
-              .toLocalizedString(new Object[] {regionName, key});
+              .toLocalizedString(regionName, key);
       handleException(message, e);
     }
   }
@@ -889,26 +891,27 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * locally destroy an entry
    * 
-   * @param m message describing the entry
+   * @param clientMessage message describing the entry
    */
-  private void handleDestroy(Message m) {
+  private void handleDestroy(Message clientMessage) {
     String regionName = null;
     Object key = null;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       this.isOpCompleted = false;
       // Retrieve the data from the local-destroy message parts
       if (isDebugEnabled) {
-        logger.debug("Received destroy message of length ({} bytes)", m.getPayloadLength());
+        logger.debug("Received destroy message of length ({} bytes)",
+            clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
-      Part keyPart = m.getPart(partCnt++);
-      Part callbackArgumentPart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part keyPart = clientMessage.getPart(partCnt++);
+      Part callbackArgumentPart = clientMessage.getPart(partCnt++);
 
-      VersionTag versionTag = (VersionTag) m.getPart(partCnt++).getObject();
+      VersionTag versionTag = (VersionTag) clientMessage.getPart(partCnt++).getObject();
       if (versionTag != null) {
         versionTag.replaceNullIDs((InternalDistributedMember) this.endpoint.getMemberId());
       }
@@ -916,8 +919,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       regionName = regionNamePart.getString();
       key = keyPart.getStringOrObject();
 
-      Part isInterestListPassedPart = m.getPart(partCnt++);
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part isInterestListPassedPart = clientMessage.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
       boolean withInterest = ((Boolean) isInterestListPassedPart.getObject()).booleanValue();
       boolean withCQs = ((Boolean) hasCqsPart.getObject()).booleanValue();
@@ -929,30 +932,32 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
             regionName, key, callbackArgument, withInterest, withCQs, versionTag);
       }
 
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
-      EventID eventId = null;
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("Region named {} does not exist", regionName);
         }
+
       } else if (region.hasServerProxy() && (withInterest || !withCQs)) {
+        EventID eventId = null;
         try {
-          Part eid = m.getPart(m.getNumberOfParts() - 1);
+          Part eid = clientMessage.getPart(clientMessage.getNumberOfParts() - 1);
           eventId = (EventID) eid.getObject();
+
           try {
             region.basicBridgeClientDestroy(eventId.getDistributedMember(), key, callbackArgument,
-                qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
+                this.qManager.getState().getProcessedMarker() || !this.isDurableClient, eventId,
                 versionTag);
-          } catch (ConcurrentCacheModificationException e) {
-            // return; allow CQs to be processed
+          } catch (ConcurrentCacheModificationException ignore) {
+            // allow CQs to be processed
           }
+
           this.isOpCompleted = true;
           if (isDebugEnabled) {
             logger.debug("Destroyed entry for region: {} key: {} callbackArgument: {}", regionName,
                 key, callbackArgument);
           }
-        } catch (EntryNotFoundException e) {
-          /* ignore */
+        } catch (EntryNotFoundException ignore) {
           if (isDebugEnabled && !quitting()) {
             logger.debug(
                 "Already destroyed entry for region: {} key: {} callbackArgument: {} eventId={}",
@@ -963,18 +968,19 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       }
 
       if (withCQs) {
-        Part numCqsPart = m.getPart(partCnt++);
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), key, null);
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+            clientMessage.getMessageType(), key, null);
         this.isOpCompleted = true;
       }
     } catch (Exception e) {
       String message =
           LocalizedStrings.CacheClientUpdater_THE_FOLLOWING_EXCEPTION_OCCURRED_WHILE_ATTEMPTING_TO_DESTROY_ENTRY_REGION_0_KEY_1
-              .toLocalizedString(new Object[] {regionName, key});
+              .toLocalizedString(regionName, key);
       handleException(message, e);
     }
   }
@@ -982,44 +988,44 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Locally destroy a region
    * 
-   * @param m message describing the region
+   * @param clientMessage message describing the region
    */
-  private void handleDestroyRegion(Message m) {
-    Part regionNamePart = null, callbackArgumentPart = null;
+  private void handleDestroyRegion(Message clientMessage) {
     String regionName = null;
-    Object callbackArgument = null;
-    LocalRegion region = null;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       // Retrieve the data from the local-destroy-region message parts
       if (isDebugEnabled) {
-        logger.debug("Received destroy region message of length ({} bytes)", m.getPayloadLength());
+        logger.debug("Received destroy region message of length ({} bytes)",
+            clientMessage.getPayloadLength());
       }
-      regionNamePart = m.getPart(partCnt++);
-      callbackArgumentPart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part callbackArgumentPart = clientMessage.getPart(partCnt++);
       regionName = regionNamePart.getString();
-      callbackArgument = callbackArgumentPart.getObject();
+      Object callbackArgument = callbackArgumentPart.getObject();
 
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
       if (isDebugEnabled) {
         logger.debug("Destroying region: {} callbackArgument: {}", regionName, callbackArgument);
       }
 
       // Handle CQs if any on this region.
-      if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
-        Part numCqsPart = m.getPart(partCnt++);
+      if ((Boolean) hasCqsPart.getObject()) {
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
+        // TODO: partCnt is unused -- does processCqs have side effects
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+            clientMessage.getMessageType(), null, null);
       }
 
       // Confirm that the region exists
-      region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("Region named {} does not exist", regionName);
@@ -1036,7 +1042,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           logger.debug("Destroyed region: {} callbackArgument: {}", regionName, callbackArgument);
         }
       }
-    } catch (RegionDestroyedException e) { // already destroyed
+    } catch (RegionDestroyedException ignore) { // already destroyed
       if (isDebugEnabled) {
         logger.debug("region already destroyed: {}", regionName);
       }
@@ -1051,24 +1057,24 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Locally clear a region
    * 
-   * @param m message describing the region to clear
+   * @param clientMessage message describing the region to clear
    */
-  private void handleClearRegion(Message m) {
+  private void handleClearRegion(Message clientMessage) {
     String regionName = null;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       // Retrieve the data from the clear-region message parts
       if (isDebugEnabled) {
         logger.debug("{}: Received clear region message of length ({} bytes)", this,
-            m.getPayloadLength());
+            clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
-      Part callbackArgumentPart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part callbackArgumentPart = clientMessage.getPart(partCnt++);
 
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
       regionName = regionNamePart.getString();
       Object callbackArgument = callbackArgumentPart.getObject();
@@ -1076,17 +1082,18 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         logger.debug("Clearing region: {} callbackArgument: {}", regionName, callbackArgument);
       }
 
-      if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
-        Part numCqsPart = m.getPart(partCnt++);
+      if ((Boolean) hasCqsPart.getObject()) {
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+            clientMessage.getMessageType(), null, null);
       }
 
       // Confirm that the region exists
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("Region named {} does not exist", regionName);
@@ -1099,7 +1106,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       if (region.hasServerProxy()) {
         // Locally clear the region
         region.basicBridgeClientClear(callbackArgument,
-            qManager.getState().getProcessedMarker() || !this.isDurableClient);
+            this.qManager.getState().getProcessedMarker() || !this.isDurableClient);
 
         if (isDebugEnabled) {
           logger.debug("Cleared region: {} callbackArgument: {}", regionName, callbackArgument);
@@ -1117,50 +1124,44 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * Locally invalidate a region NOTE: Added as part of bug#38048. The code only takes care of CQ
    * processing. Support needs to be added for local region invalidate.
    * 
-   * @param m message describing the region to clear
+   * @param clientMessage message describing the region to clear
    */
-  private void handleInvalidateRegion(Message m) {
+  private void handleInvalidateRegion(Message clientMessage) {
     String regionName = null;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       // Retrieve the data from the invalidate-region message parts
       if (isDebugEnabled) {
         logger.debug("{}: Received invalidate region message of length ({} bytes)", this,
-            m.getPayloadLength());
+            clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
       partCnt++; // Part callbackArgumentPart = m.getPart(partCnt++);
 
-      Part hasCqsPart = m.getPart(partCnt++);
+      Part hasCqsPart = clientMessage.getPart(partCnt++);
 
       regionName = regionNamePart.getString();
-      // Object callbackArgument = callbackArgumentPart.getObject();
 
-      if (((Boolean) hasCqsPart.getObject()).booleanValue()) {
-        Part numCqsPart = m.getPart(partCnt++);
+      if ((Boolean) hasCqsPart.getObject()) {
+        Part numCqsPart = clientMessage.getPart(partCnt++);
         if (isDebugEnabled) {
           logger.debug("Received message has CQ Event. Number of cqs interested in the event : {}",
               numCqsPart.getInt() / 2);
         }
-        partCnt = processCqs(m, partCnt, numCqsPart.getInt(), m.getMessageType(), null, null);
+        // TODO: partCnt is unused
+        partCnt = processCqs(clientMessage, partCnt, numCqsPart.getInt(),
+            clientMessage.getMessageType(), null, null);
       }
 
       // Confirm that the region exists
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("Region named {} does not exist", regionName);
         }
-        return;
-      }
-
-      // Verify that the region in question should respond to this
-      // message
-      if (region.hasServerProxy()) {
-        return;
       }
 
     } catch (Exception e) {
@@ -1174,40 +1175,39 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Register instantiators locally
    *
-   * @param msg message describing the new instantiators
+   * @param clientMessage message describing the new instantiators
    * @param eventId eventId of the instantiators
    */
-  private void handleRegisterInstantiator(Message msg, EventID eventId) {
+  private void handleRegisterInstantiator(Message clientMessage, EventID eventId) {
     String instantiatorClassName = null;
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
-      int noOfParts = msg.getNumberOfParts();
+      int noOfParts = clientMessage.getNumberOfParts();
       if (isDebugEnabled) {
         logger.debug("{}: Received register instantiators message of parts {}", getName(),
             noOfParts);
       }
+
       Assert.assertTrue((noOfParts - 1) % 3 == 0);
-      for (int i = 0; i < noOfParts - 1; i = i + 3) {
+      for (int i = 0; i < noOfParts - 1; i += 3) {
         instantiatorClassName =
-            (String) CacheServerHelper.deserialize(msg.getPart(i).getSerializedForm());
-        String instantiatedClassName =
-            (String) CacheServerHelper.deserialize(msg.getPart(i + 1).getSerializedForm());
-        int id = msg.getPart(i + 2).getInt();
+            (String) CacheServerHelper.deserialize(clientMessage.getPart(i).getSerializedForm());
+        String instantiatedClassName = (String) CacheServerHelper
+            .deserialize(clientMessage.getPart(i + 1).getSerializedForm());
+        int id = clientMessage.getPart(i + 2).getInt();
         InternalInstantiator.register(instantiatorClassName, instantiatedClassName, id, false,
-            eventId, null/* context */);
-        // distribute is false because we don't want to propagate this to
-        // servers recursively
+            eventId, null);
+        // distribute is false because we don't want to propagate this to servers recursively
       }
 
       // CALLBACK TESTING PURPOSE ONLY
       if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
-        ClientServerObserver bo = ClientServerObserverHolder.getInstance();
-        bo.afterReceivingFromServer(eventId);
+        ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance();
+        clientServerObserver.afterReceivingFromServer(eventId);
       }
 
-    }
-    // TODO bug: can the following catch be more specific?
-    catch (Exception e) {
+    } catch (Exception e) {
       if (isDebugEnabled) {
         logger.debug("{}: Caught following exception while attempting to read Instantiator : {}",
             this, instantiatorClassName, e);
@@ -1218,6 +1218,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   private void handleRegisterDataSerializer(Message msg, EventID eventId) {
     Class dataSerializerClass = null;
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       int noOfParts = msg.getNumberOfParts();
       if (isDebugEnabled) {
@@ -1231,8 +1232,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
               (String) CacheServerHelper.deserialize(msg.getPart(i).getSerializedForm());
           int id = msg.getPart(i + 1).getInt();
           InternalDataSerializer.register(dataSerializerClassName, false, eventId, null, id);
-          // distribute is false because we don't want to propagate this to
-          // servers recursively
+          // distribute is false because we don't want to propagate this to servers recursively
 
           int numOfClasses = msg.getPart(i + 2).getInt();
           int j = 0;
@@ -1241,7 +1241,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
                 (String) CacheServerHelper.deserialize(msg.getPart(i + 3 + j).getSerializedForm());
             InternalDataSerializer.updateSupportedClassesMap(dataSerializerClassName, className);
           }
-          i = i + 3 + j;
+
+          i += 3 + j;
         } catch (ClassNotFoundException e) {
           if (isDebugEnabled) {
             logger.debug(
@@ -1257,9 +1258,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         bo.afterReceivingFromServer(eventId);
       }
 
-    }
-    // TODO bug: can the following catch be more specific?
-    catch (Exception e) {
+    } catch (Exception e) {
       if (isDebugEnabled) {
         logger.debug("{}: Caught following exception while attempting to read DataSerializer : {}",
             this, dataSerializerClass, e);
@@ -1270,93 +1269,87 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
   /**
    * Processes message to invoke CQ listeners.
    */
-  private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType,
-      Object key, Object value) {
-    return processCqs(m, startMessagePart, numCqParts, messageType, key, value, null,
-        null/* eventId */);
+  private int processCqs(Message clientMessage, int startMessagePart, int numCqParts,
+      int messageType, Object key, Object value) {
+    return processCqs(clientMessage, startMessagePart, numCqParts, messageType, key, value, null,
+        null);
   }
 
-  private int processCqs(Message m, int startMessagePart, int numCqParts, int messageType,
-      Object key, Object value, byte[] delta, EventID eventId) {
+  private int processCqs(Message clientMessage, int startMessagePart, int numCqParts,
+      int messageType, Object key, Object value, byte[] delta, EventID eventId) {
     HashMap cqs = new HashMap();
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
     for (int cqCnt = 0; cqCnt < numCqParts;) {
-      StringBuilder str = null;
+      StringBuilder sb = null;
       if (isDebugEnabled) {
-        str = new StringBuilder(100);
-        str.append("found these queries: ");
+        sb = new StringBuilder(100);
+        sb.append("found these queries: ");
       }
       try {
         // Get CQ Name.
-        Part cqNamePart = m.getPart(startMessagePart + (cqCnt++));
+        Part cqNamePart = clientMessage.getPart(startMessagePart + cqCnt++);
         // Get CQ Op.
-        Part cqOpPart = m.getPart(startMessagePart + (cqCnt++));
-        cqs.put(cqNamePart.getString(), Integer.valueOf(cqOpPart.getInt()));
+        Part cqOpPart = clientMessage.getPart(startMessagePart + cqCnt++);
+        cqs.put(cqNamePart.getString(), cqOpPart.getInt());
 
-        if (str != null) {
-          str.append(cqNamePart.getString()).append(" op=").append(cqOpPart.getInt()).append("  ");
+        if (sb != null) {
+          sb.append(cqNamePart.getString()).append(" op=").append(cqOpPart.getInt()).append("  ");
         }
-      } catch (Exception ex) {
+      } catch (Exception ignore) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.CacheClientUpdater_ERROR_WHILE_PROCESSING_THE_CQ_MESSAGE_PROBLEM_WITH_READING_MESSAGE_FOR_CQ_0,
             cqCnt));
       }
-      if (isDebugEnabled && str != null) {
-        logger.debug(str);
+      if (isDebugEnabled) {
+        logger.debug(sb);
       }
     }
 
-    {
-      CqService cqService = this.cache.getCqService();
-      try {
-        cqService.dispatchCqListeners(cqs, messageType, key, value, delta, qManager, eventId);
-      } catch (Exception ex) {
-        logger.warn(LocalizedMessage.create(
-            LocalizedStrings.CacheClientUpdater_FAILED_TO_INVOKE_CQ_DISPATCHER_ERROR___0,
-            ex.getMessage()));
-        if (isDebugEnabled) {
-          logger.debug("Failed to invoke CQ Dispatcher.", ex);
-        }
+    CqService cqService = this.cache.getCqService();
+    try {
+      cqService.dispatchCqListeners(cqs, messageType, key, value, delta, this.qManager, eventId);
+    } catch (Exception ex) {
+      logger.warn(LocalizedMessage.create(
+          LocalizedStrings.CacheClientUpdater_FAILED_TO_INVOKE_CQ_DISPATCHER_ERROR___0,
+          ex.getMessage()));
+      if (isDebugEnabled) {
+        logger.debug("Failed to invoke CQ Dispatcher.", ex);
       }
     }
 
-    return (startMessagePart + numCqParts);
+    return startMessagePart + numCqParts;
   }
 
-  private void handleRegisterInterest(Message m) {
+  private void handleRegisterInterest(Message clientMessage) {
     String regionName = null;
     Object key = null;
-    int interestType;
-    byte interestResultPolicy;
-    boolean isDurable;
-    boolean receiveUpdatesAsInvalidates;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       // Retrieve the data from the add interest message parts
       if (isDebugEnabled) {
         logger.debug("{}: Received add interest message of length ({} bytes)", this,
-            m.getPayloadLength());
+            clientMessage.getPayloadLength());
       }
-      Part regionNamePart = m.getPart(partCnt++);
-      Part keyPart = m.getPart(partCnt++);
-      Part interestTypePart = m.getPart(partCnt++);
-      Part interestResultPolicyPart = m.getPart(partCnt++);
-      Part isDurablePart = m.getPart(partCnt++);
-      Part receiveUpdatesAsInvalidatesPart = m.getPart(partCnt++);
+
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part keyPart = clientMessage.getPart(partCnt++);
+      Part interestTypePart = clientMessage.getPart(partCnt++);
+      Part interestResultPolicyPart = clientMessage.getPart(partCnt++);
+      Part isDurablePart = clientMessage.getPart(partCnt++);
+      Part receiveUpdatesAsInvalidatesPart = clientMessage.getPart(partCnt++);
 
       regionName = regionNamePart.getString();
       key = keyPart.getStringOrObject();
-      interestType = ((Integer) interestTypePart.getObject()).intValue();
-      interestResultPolicy = ((Byte) interestResultPolicyPart.getObject()).byteValue();
-      isDurable = ((Boolean) isDurablePart.getObject()).booleanValue();
-      receiveUpdatesAsInvalidates =
-          ((Boolean) receiveUpdatesAsInvalidatesPart.getObject()).booleanValue();
+      int interestType = (Integer) interestTypePart.getObject();
+      byte interestResultPolicy = (Byte) interestResultPolicyPart.getObject();
+      boolean isDurable = (Boolean) isDurablePart.getObject();
+      boolean receiveUpdatesAsInvalidates = (Boolean) receiveUpdatesAsInvalidatesPart.getObject();
 
       // Confirm that region exists
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled && !quitting()) {
           logger.debug("{}: Region named {} does not exist", this, regionName);
@@ -1386,38 +1379,34 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     }
   }
 
-  private void handleUnregisterInterest(Message m) {
+  private void handleUnregisterInterest(Message clientMessage) {
     String regionName = null;
     Object key = null;
-    int interestType;
-    boolean isDurable;
-    boolean receiveUpdatesAsInvalidates;
-    int partCnt = 0;
-
     final boolean isDebugEnabled = logger.isDebugEnabled();
+
     try {
       // Retrieve the data from the remove interest message parts
       if (isDebugEnabled) {
         logger.debug("{}: Received remove interest message of length ({} bytes)", this,
-            m.getPayloadLength());
+            clientMessage.getPayloadLength());
       }
 
-      Part regionNamePart = m.getPart(partCnt++);
-      Part keyPart = m.getPart(partCnt++);
-      Part interestTypePart = m.getPart(partCnt++);
-      Part isDurablePart = m.getPart(partCnt++);
-      Part receiveUpdatesAsInvalidatesPart = m.getPart(partCnt++);
+      int partCnt = 0;
+      Part regionNamePart = clientMessage.getPart(partCnt++);
+      Part keyPart = clientMessage.getPart(partCnt++);
+      Part interestTypePart = clientMessage.getPart(partCnt++);
+      Part isDurablePart = clientMessage.getPart(partCnt++);
+      Part receiveUpdatesAsInvalidatesPart = clientMessage.getPart(partCnt++);
       // Not reading the eventId part
 
       regionName = regionNamePart.getString();
       key = keyPart.getStringOrObject();
-      interestType = ((Integer) interestTypePart.getObject()).intValue();
-      isDurable = ((Boolean) isDurablePart.getObject()).booleanValue();
-      receiveUpdatesAsInvalidates =
-          ((Boolean) receiveUpdatesAsInvalidatesPart.getObject()).booleanValue();
+      int interestType = (Integer) interestTypePart.getObject();
+      boolean isDurable = (Boolean) isDurablePart.getObject();
+      boolean receiveUpdatesAsInvalidates = (Boolean) receiveUpdatesAsInvalidatesPart.getObject();
 
       // Confirm that region exists
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
       if (region == null) {
         if (isDebugEnabled) {
           logger.debug("{}: Region named {} does not exist", this, regionName);
@@ -1445,14 +1434,17 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     }
   }
 
-  private void handleTombstoneOperation(Message msg) {
+  private void handleTombstoneOperation(Message clientMessage) {
     String regionName = "unknown";
+
     try { // not sure why this isn't done by the caller
       int partIdx = 0;
+
       // see ClientTombstoneMessage.getGFE70Message
-      regionName = msg.getPart(partIdx++).getString();
-      int op = msg.getPart(partIdx++).getInt();
-      LocalRegion region = (LocalRegion) cacheHelper.getRegion(regionName);
+      regionName = clientMessage.getPart(partIdx++).getString();
+      int op = clientMessage.getPart(partIdx++).getInt();
+      LocalRegion region = (LocalRegion) this.cacheHelper.getRegion(regionName);
+
       if (region == null) {
         if (!quitting()) {
           if (logger.isDebugEnabled()) {
@@ -1461,24 +1453,29 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         }
         return;
       }
+
       if (logger.isDebugEnabled()) {
         logger.debug("{}: Received tombstone operation for region {} with operation={}", this,
             region, op);
       }
+
       if (!region.getConcurrencyChecksEnabled()) {
         return;
       }
+
       switch (op) {
         case 0:
           Map<VersionSource, Long> regionGCVersions =
-              (Map<VersionSource, Long>) msg.getPart(partIdx++).getObject();
-          EventID eventID = (EventID) msg.getPart(partIdx++).getObject();
+              (Map<VersionSource, Long>) clientMessage.getPart(partIdx++).getObject();
+          EventID eventID = (EventID) clientMessage.getPart(partIdx++).getObject();
           region.expireTombstones(regionGCVersions, eventID, null);
           break;
+
         case 1:
-          Set<Object> removedKeys = (Set<Object>) msg.getPart(partIdx++).getObject();
+          Set<Object> removedKeys = (Set<Object>) clientMessage.getPart(partIdx++).getObject();
           region.expireTombstoneKeys(removedKeys);
           break;
+
         default:
           throw new IllegalArgumentException("unknown operation type " + op);
       }
@@ -1494,22 +1491,21 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    */
   private boolean quitting() {
     if (isInterrupted()) {
-      // Any time an interrupt is thrown at this thread, regard it as a
-      // request to terminate
+      // Any time an interrupt is thrown at this thread, regard it as a request to terminate
       return true;
     }
-    if (!continueProcessing.get()) {
+    if (!this.continueProcessing.get()) {
       // de facto flag indicating we are to stop
       return true;
     }
-    if (cache != null && cache.getCancelCriterion().isCancelInProgress()) {
+    if (this.cache != null && this.cache.getCancelCriterion().isCancelInProgress()) {
       // System is cancelling
       return true;
     }
 
     // The pool stuff is really sick, so it's possible for us to have a distributed
     // system that is not the same as our cache. Check it just in case...
-    if (system.getCancelCriterion().isCancelInProgress()) {
+    if (this.system.getCancelCriterion().isCancelInProgress()) {
       return true;
     }
 
@@ -1531,15 +1527,15 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           this.failedUpdater.join(5000);
         }
       }
-    } catch (InterruptedException ie) {
+    } catch (InterruptedException ignore) {
       gotInterrupted = true;
-      return; // just bail, because I have not done anything yet
+      // just bail, because I have not done anything yet
     } finally {
       if (!gotInterrupted && this.failedUpdater != null) {
         logger.info(LocalizedMessage.create(
             LocalizedStrings.CacheClientUpdater_0_HAS_COMPLETED_WAITING_FOR_1,
             new Object[] {this, this.failedUpdater}));
-        failedUpdater = null;
+        this.failedUpdater = null;
       }
     }
   }
@@ -1548,6 +1544,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * Processes messages received from the server.
    * 
    * Only certain types of messages are handled.
+   *
+   * TODO: Method 'processMessages' is too complex to analyze by data flow algorithm
    * 
    * @see MessageType#CLIENT_MARKER
    * @see MessageType#LOCAL_CREATE
@@ -1558,11 +1556,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * @see MessageType#CLEAR_REGION
    * @see ClientUpdateMessage
    */
-  protected void processMessages() {
+  private void processMessages() {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     try {
-      Part eid = null;
-      Message _message = initializeMessage();
+      Message clientMessage = initializeMessage();
+
       if (quitting()) {
         if (isDebugEnabled) {
           logger.debug("processMessages quitting early because we have stopped");
@@ -1570,11 +1568,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
         // our caller calls close which will notify all waiters for our init
         return;
       }
+
       logger.info(LocalizedMessage
           .create(LocalizedStrings.CacheClientUpdater_0_READY_TO_PROCESS_MESSAGES, this));
 
-      while (continueProcessing.get()) {
-        // SystemFailure.checkFailure(); dm will check this
+      while (this.continueProcessing.get()) {
         if (quitting()) {
           if (isDebugEnabled) {
             logger.debug("termination detected");
@@ -1594,12 +1592,12 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
 
         try {
           // Read the message
-          _message.recv();
+          clientMessage.recv();
 
           // Wait for the previously failed cache client updater
           // to finish. This will avoid out of order messages.
           waitForFailedUpdater();
-          cache.waitForRegisterInterestsInProgress();
+          this.cache.waitForRegisterInterestsInProgress();
           if (quitting()) {
             if (isDebugEnabled) {
               logger.debug("processMessages quitting before processing message");
@@ -1608,7 +1606,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           }
 
           // If the message is a ping, ignore it
-          if (_message.getMessageType() == MessageType.SERVER_TO_CLIENT_PING) {
+          if (clientMessage.getMessageType() == MessageType.SERVER_TO_CLIENT_PING) {
             if (isDebugEnabled) {
               logger.debug("{}: Received ping", this);
             }
@@ -1616,76 +1614,80 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           }
 
           boolean isDeltaSent = false;
-          boolean isCreateOrUpdate = _message.getMessageType() == MessageType.LOCAL_CREATE
-              || _message.getMessageType() == MessageType.LOCAL_UPDATE;
+          boolean isCreateOrUpdate = clientMessage.getMessageType() == MessageType.LOCAL_CREATE
+              || clientMessage.getMessageType() == MessageType.LOCAL_UPDATE;
           if (isCreateOrUpdate) {
-            isDeltaSent = ((Boolean) _message.getPart(2).getObject()).booleanValue();
+            isDeltaSent = (Boolean) clientMessage.getPart(2).getObject();
           }
 
           // extract the eventId and verify if it is a duplicate event
           // if it is a duplicate event, ignore
           // @since GemFire 5.1
-          int numberOfParts = _message.getNumberOfParts();
-          eid = _message.getPart(numberOfParts - 1);
+          int numberOfParts = clientMessage.getNumberOfParts();
+          Part eid = clientMessage.getPart(numberOfParts - 1);
+
           // TODO the message handling methods also deserialized the eventID - inefficient
           EventID eventId = (EventID) eid.getObject();
 
           // no need to verify if the instantiator msg is duplicate or not
-          if (_message.getMessageType() != MessageType.REGISTER_INSTANTIATORS
-              && _message.getMessageType() != MessageType.REGISTER_DATASERIALIZERS) {
+          if (clientMessage.getMessageType() != MessageType.REGISTER_INSTANTIATORS
+              && clientMessage.getMessageType() != MessageType.REGISTER_DATASERIALIZERS) {
             if (this.qManager.getState().verifyIfDuplicate(eventId,
                 !(this.isDurableClient || isDeltaSent))) {
               continue;
             }
           }
+
           if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
-            logger.trace(LogMarker.BRIDGE_SERVER,
-                "Processing event with id {}" + eventId.expensiveToString());
+            logger.trace(LogMarker.BRIDGE_SERVER, "Processing event with id {}",
+                eventId.expensiveToString());
           }
+
           this.isOpCompleted = true;
+
           // Process the message
-          switch (_message.getMessageType()) {
+          switch (clientMessage.getMessageType()) {
             case MessageType.LOCAL_CREATE:
             case MessageType.LOCAL_UPDATE:
-              handleUpdate(_message);
+              handleUpdate(clientMessage);
               break;
             case MessageType.LOCAL_INVALIDATE:
-              handleInvalidate(_message);
+              handleInvalidate(clientMessage);
               break;
             case MessageType.LOCAL_DESTROY:
-              handleDestroy(_message);
+              handleDestroy(clientMessage);
               break;
             case MessageType.LOCAL_DESTROY_REGION:
-              handleDestroyRegion(_message);
+              handleDestroyRegion(clientMessage);
               break;
             case MessageType.CLEAR_REGION:
-              handleClearRegion(_message);
+              handleClearRegion(clientMessage);
               break;
             case MessageType.REGISTER_INSTANTIATORS:
-              handleRegisterInstantiator(_message, eventId);
+              handleRegisterInstantiator(clientMessage, eventId);
               break;
             case MessageType.REGISTER_DATASERIALIZERS:
-              handleRegisterDataSerializer(_message, eventId);
+              handleRegisterDataSerializer(clientMessage, eventId);
               break;
             case MessageType.CLIENT_MARKER:
-              handleMarker(_message);
+              handleMarker(clientMessage);
               break;
             case MessageType.INVALIDATE_REGION:
-              handleInvalidateRegion(_message);
+              handleInvalidateRegion(clientMessage);
               break;
             case MessageType.CLIENT_REGISTER_INTEREST:
-              handleRegisterInterest(_message);
+              handleRegisterInterest(clientMessage);
               break;
             case MessageType.CLIENT_UNREGISTER_INTEREST:
-              handleUnregisterInterest(_message);
+              handleUnregisterInterest(clientMessage);
               break;
             case MessageType.TOMBSTONE_OPERATION:
-              handleTombstoneOperation(_message);
+              handleTombstoneOperation(clientMessage);
               break;
             default:
               logger.warn(LocalizedMessage.create(
                   LocalizedStrings.CacheClientUpdater_0_RECEIVED_AN_UNSUPPORTED_MESSAGE_TYPE_1,
-                  new Object[] {this, MessageType.getString(_message.getMessageType())}));
+                  new Object[] {this, MessageType.getString(clientMessage.getMessageType())}));
               break;
           }
 
@@ -1700,7 +1702,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           // likely to send pings...
           // and the ClientHealthMonitor will cause a disconnect
 
-        } catch (InterruptedIOException e) {
+        } catch (InterruptedIOException ignore) {
           // Per Sun's support web site, this exception seems to be peculiar
           // to Solaris, and may eventually not even be generated there.
           //
@@ -1708,62 +1710,59 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
           // isInterrupted() is false. (How very odd!)
           //
           // We regard it the same as an InterruptedException
-          this.endPointDied = true;
 
-          continueProcessing.set(false);// = false;
+          this.continueProcessing.set(false);
           if (isDebugEnabled) {
             logger.debug("InterruptedIOException");
           }
+
         } catch (IOException e) {
-          this.endPointDied = true;
           // Either the server went away, or we caught a closing condition.
           if (!quitting()) {
             // Server departed; print a message.
-            String message = ": Caught the following exception and will exit: ";
-            String errMessage = e.getMessage();
-            if (errMessage == null) {
-              errMessage = "";
-            }
-            ClientServerObserver bo = ClientServerObserverHolder.getInstance();
-            bo.beforeFailoverByCacheClientUpdater(this.location);
-            eManager.serverCrashed(this.endpoint);
+            ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance();
+            clientServerObserver.beforeFailoverByCacheClientUpdater(this.location);
+            this.eManager.serverCrashed(this.endpoint);
             if (isDebugEnabled) {
-              logger.debug("" + message + e);
+              logger.debug("Caught the following exception and will exit", e);
             }
           } // !quitting
 
           // In any event, terminate this thread.
-          continueProcessing.set(false);// = false;
+          this.continueProcessing.set(false);
           if (isDebugEnabled) {
             logger.debug("terminated due to IOException");
           }
+
         } catch (Exception e) {
           if (!quitting()) {
-            this.endPointDied = true;
-            ClientServerObserver bo = ClientServerObserverHolder.getInstance();
-            bo.beforeFailoverByCacheClientUpdater(this.location);
-            eManager.serverCrashed(this.endpoint);
+            ClientServerObserver clientServerObserver = ClientServerObserverHolder.getInstance();
+            clientServerObserver.beforeFailoverByCacheClientUpdater(this.location);
+            this.eManager.serverCrashed(this.endpoint);
             String message = ": Caught the following exception and will exit: ";
             handleException(message, e);
           }
+
           // In any event, terminate this thread.
-          continueProcessing.set(false);// = false; // force termination
+          this.continueProcessing.set(false);// = false; // force termination
           if (isDebugEnabled) {
             logger.debug("CCU terminated due to Exception");
           }
+
         } finally {
-          _message.clear();
+          clientMessage.clear();
         }
       } // while
+
     } finally {
       if (isDebugEnabled) {
         logger.debug("has stopped and cleaning the helper ..");
       }
-      this.close(); // added to fixes some race conditions associated with 38382
+      close(); // added to fix some race conditions associated with 38382
       // this will make sure that if this thread dies without starting QueueMgr then it will start..
       // 1. above we ignore InterruptedIOException and this thread dies without informing QueueMgr
-      // 2. if there is some other race codition with continueProcessing flag
-      this.qManager.checkEndpoint(this, endpoint);
+      // 2. if there is some other race condition with continueProcessing flag
+      this.qManager.checkEndpoint(this, this.endpoint);
     }
   }
 
@@ -1796,12 +1795,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    */
   private Object deserialize(byte[] serializedBytes) {
     Object deserializedObject = serializedBytes;
-    // This is a debugging method so ignore all exceptions like
-    // ClassNotFoundException
+    // This is a debugging method so ignore all exceptions like ClassNotFoundException
     try {
       DataInputStream dis = new DataInputStream(new ByteArrayInputStream(serializedBytes));
       deserializedObject = DataSerializer.readObject(dis);
-    } catch (Exception e) {
+    } catch (ClassNotFoundException | IOException ignore) {
     }
     return deserializedObject;
   }
@@ -1810,18 +1808,14 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * @return the local port of our {@link #socket}
    */
   protected int getLocalPort() {
-    return socket.getLocalPort();
+    return this.socket.getLocalPort();
   }
 
+  @Override
   public void onDisconnect(InternalDistributedSystem sys) {
     stopUpdater();
   }
 
-  /**
-   * true if the EndPoint represented by this updater thread has died.
-   */
-  private volatile boolean endPointDied = false;
-
   private void verifySocketBufferSize(int requestedBufferSize, int actualBufferSize, String type) {
     if (actualBufferSize < requestedBufferSize) {
       logger.info(LocalizedMessage.create(
@@ -1837,11 +1831,11 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
    * @since GemFire 5.7
    */
   public static class CCUStats implements MessageStats {
-    // static fields
+
     private static final StatisticsType type;
-    private final static int messagesBeingReceivedId;
-    private final static int messageBytesBeingReceivedId;
-    private final static int receivedBytesId;
+    private static final int messagesBeingReceivedId;
+    private static final int messageBytesBeingReceivedId;
+    private static final int receivedBytesId;
 
     static {
       StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
@@ -1863,7 +1857,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     // instance fields
     private final Statistics stats;
 
-    public CCUStats(DistributedSystem ids, ServerLocation location) {
+    CCUStats(DistributedSystem ids, ServerLocation location) {
       // no need for atomic since only a single thread will be writing these
       this.stats = ids.createStatistics(type, "CacheClientUpdater-" + location);
     }
@@ -1872,25 +1866,29 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       this.stats.close();
     }
 
+    @Override
     public void incReceivedBytes(long v) {
       this.stats.incLong(receivedBytesId, v);
     }
 
+    @Override
     public void incSentBytes(long v) {
       // noop since we never send messages
     }
 
+    @Override
     public void incMessagesBeingReceived(int bytes) {
-      stats.incInt(messagesBeingReceivedId, 1);
+      this.stats.incInt(messagesBeingReceivedId, 1);
       if (bytes > 0) {
-        stats.incLong(messageBytesBeingReceivedId, bytes);
+        this.stats.incLong(messageBytesBeingReceivedId, bytes);
       }
     }
 
+    @Override
     public void decMessagesBeingReceived(int bytes) {
-      stats.incInt(messagesBeingReceivedId, -1);
+      this.stats.incInt(messagesBeingReceivedId, -1);
       if (bytes > 0) {
-        stats.incLong(messageBytesBeingReceivedId, -bytes);
+        this.stats.incLong(messageBytesBeingReceivedId, -bytes);
       }
     }
 
@@ -1904,7 +1902,8 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     }
   }
 
+  @Override
   public boolean isProcessing() {
-    return continueProcessing.get();
+    return this.continueProcessing.get();
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
index 2a5a3d7..39c2f3a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
@@ -22,7 +22,6 @@ import org.apache.geode.internal.logging.LogService;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 
 import org.apache.logging.log4j.Logger;
@@ -36,7 +35,7 @@ import org.apache.logging.log4j.Logger;
  * 
  * <PRE>
  * 
- * msgType - int - 4 bytes type of message, types enumerated below
+ * messageType - int - 4 bytes type of message, types enumerated below
  * 
  * numberOfParts - int - 4 bytes number of elements (LEN-BYTE* pairs) contained
  * in the payload. Message can be a multi-part message
@@ -153,7 +152,8 @@ public class ChunkedMessage extends Message {
 
   public void setLastChunkAndNumParts(boolean lastChunk, int numParts) {
     setLastChunk(lastChunk);
-    if (this.sc != null && this.sc.getClientVersion().compareTo(Version.GFE_65) >= 0) {
+    if (this.serverConnection != null
+        && this.serverConnection.getClientVersion().compareTo(Version.GFE_65) >= 0) {
       // we us e three bits for number of parts in last chunk byte
       // we us e three bits for number of parts in last chunk byte
       byte localLastChunk = (byte) (numParts << 5);
@@ -162,7 +162,7 @@ public class ChunkedMessage extends Message {
   }
 
   public void setServerConnection(ServerConnection servConn) {
-    if (this.sc != servConn)
+    if (this.serverConnection != servConn)
       throw new IllegalStateException("this.sc was not correctly set");
   }
 
@@ -209,7 +209,7 @@ public class ChunkedMessage extends Message {
         // Set the header and payload fields only after receiving all the
         // socket data, providing better message consistency in the face
         // of exceptional conditions (e.g. IO problems, timeouts etc.)
-        this.msgType = type;
+        this.messageType = type;
         this.numberOfParts = numParts; // Already set in setPayloadFields via setNumberOfParts
         this.transactionId = txid;
       }
@@ -241,14 +241,15 @@ public class ChunkedMessage extends Message {
     int totalBytesRead = 0;
     do {
       int bytesRead = 0;
-      bytesRead = is.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead);
+      bytesRead =
+          inputStream.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - totalBytesRead);
       if (bytesRead == -1) {
         throw new EOFException(
             LocalizedStrings.ChunkedMessage_CHUNK_READ_ERROR_CONNECTION_RESET.toLocalizedString());
       }
       totalBytesRead += bytesRead;
-      if (this.msgStats != null) {
-        this.msgStats.incReceivedBytes(bytesRead);
+      if (this.messageStats != null) {
+        this.messageStats.incReceivedBytes(bytesRead);
       }
     } while (totalBytesRead < CHUNK_HEADER_LENGTH);
 
@@ -315,7 +316,7 @@ public class ChunkedMessage extends Message {
    * Sends a chunk of this message.
    */
   public void sendChunk(ServerConnection servConn) throws IOException {
-    if (this.sc != servConn)
+    if (this.serverConnection != servConn)
       throw new IllegalStateException("this.sc was not correctly set");
     sendChunk();
   }
@@ -355,7 +356,7 @@ public class ChunkedMessage extends Message {
   protected void getHeaderBytesForWrite() {
     final ByteBuffer cb = getCommBuffer();
     cb.clear();
-    cb.putInt(this.msgType);
+    cb.putInt(this.messageType);
     cb.putInt(this.numberOfParts);
 
     cb.putInt(this.transactionId);


Mime
View raw message