geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [2/2] geode git commit: 2632: refactor code to use InternalCache instead of GemFireCacheImpl
Date Wed, 19 Apr 2017 21:46:40 GMT
2632: refactor code to use InternalCache instead of GemFireCacheImpl

* minor cleanup also


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/bc39e973
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/bc39e973
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/bc39e973

Branch: refs/heads/feature/GEODE-2632-5
Commit: bc39e97390d23537428ea3e7cd6c2e553f54bc73
Parents: 76c4983
Author: Kirk Lund <klund@apache.org>
Authored: Wed Apr 19 14:41:42 2017 -0700
Committer: Kirk Lund <klund@apache.org>
Committed: Wed Apr 19 14:41:42 2017 -0700

----------------------------------------------------------------------
 .../query/internal/cq/CqServiceProvider.java    |  19 +-
 .../query/internal/cq/spi/CqServiceFactory.java |   8 +-
 .../cache/query/internal/cq/ClientCQImpl.java   |  94 +--
 .../cache/query/internal/cq/CqQueryImpl.java    |  90 ++-
 .../query/internal/cq/CqServiceFactoryImpl.java |  17 +-
 .../cache/query/internal/cq/CqServiceImpl.java  | 668 ++++---------------
 .../internal/cq/CqServiceStatisticsImpl.java    |  21 +-
 .../query/internal/cq/CqServiceVsdStats.java    |  73 +-
 .../cache/tier/sockets/command/ExecuteCQ.java   |   4 +-
 .../cache/tier/sockets/command/ExecuteCQ61.java |   4 +-
 .../tier/sockets/command/GetDurableCQs.java     |   4 +-
 .../cache/query/cq/dunit/CqStatsDUnitTest.java  |  32 +-
 .../cq/dunit/CqStatsUsingPoolDUnitTest.java     |  35 +-
 .../TopEntriesFunctionCollector.java            |  18 +-
 14 files changed, 316 insertions(+), 771 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
index cded9c3..d629352 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java
@@ -16,7 +16,7 @@ package org.apache.geode.cache.query.internal.cq;
 
 import org.apache.geode.cache.query.internal.cq.spi.CqServiceFactory;
 import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 
 import java.io.DataInput;
 import java.io.IOException;
@@ -26,17 +26,19 @@ import java.util.ServiceLoader;
 public class CqServiceProvider {
 
   private static final CqServiceFactory factory;
-  // System property to maintain the CQ event references for optimizing the updates.
-  // This will allows to run the CQ query only once during update events.
+
+  /**
+   * System property to maintain the CQ event references for optimizing the updates.
+   * This will allow running the CQ query only once during update events.
+   */
   public static boolean MAINTAIN_KEYS = Boolean
-      .valueOf(System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.MAINTAIN_KEYS", "true"))
-      .booleanValue();
+      .valueOf(System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.MAINTAIN_KEYS", "true"));
+
   /**
    * A debug flag used for testing vMotion during CQ registration
    */
   public static boolean VMOTION_DURING_CQ_REGISTRATION_FLAG = false;
 
-
   static {
     ServiceLoader<CqServiceFactory> loader = ServiceLoader.load(CqServiceFactory.class);
     Iterator<CqServiceFactory> itr = loader.iterator();
@@ -48,8 +50,7 @@ public class CqServiceProvider {
     }
   }
 
-  public static CqService create(GemFireCacheImpl cache) {
-
+  public static CqService create(InternalCache cache) {
     if (factory == null) {
       return new MissingCqService();
     }
@@ -63,10 +64,8 @@ public class CqServiceProvider {
     } else {
       return factory.readCqQuery(in);
     }
-
   }
 
   private CqServiceProvider() {
-
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
index 68ebbd5..2b8a47e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java
@@ -19,16 +19,16 @@ import java.io.IOException;
 
 import org.apache.geode.cache.query.internal.cq.CqService;
 import org.apache.geode.cache.query.internal.cq.ServerCQ;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 
 public interface CqServiceFactory {
 
-  public void initialize();
+  void initialize();
 
   /**
    * Create a new CqService for the given cache
    */
-  public CqService create(GemFireCacheImpl cache);
+  CqService create(InternalCache cache);
 
-  public ServerCQ readCqQuery(DataInput in) throws ClassNotFoundException, IOException;
+  ServerCQ readCqQuery(DataInput in) throws ClassNotFoundException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
index 00a0aa5..1a331da 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java
@@ -35,7 +35,7 @@ import org.apache.geode.cache.query.CqResults;
 import org.apache.geode.cache.query.CqStatusListener;
 import org.apache.geode.cache.query.RegionNotFoundException;
 import org.apache.geode.cache.query.internal.CqStateImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
@@ -57,7 +57,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
    */
   private volatile ConcurrentLinkedQueue<CqEventImpl> queuedEvents = null;
 
-  public final Object queuedEventsSynchObject = new Object();
+  final Object queuedEventsSynchObject = new Object();
 
   private boolean connected = false;
 
@@ -73,22 +73,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
     return this.cqName;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#getCQProxy()
-   */
-  public ServerCQProxyImpl getCQProxy() {
+  ServerCQProxyImpl getCQProxy() {
     return this.cqProxy;
   }
 
   /**
    * Initializes the connection using the pool from the client region. Also sets the cqBaseRegion
    * value of this CQ.
-   * 
-   * @throws CqException
    */
-  public void initConnectionProxy() throws CqException, RegionNotFoundException {
+  private void initConnectionProxy() throws CqException, RegionNotFoundException {
     cqBaseRegion = (LocalRegion) cqService.getCache().getRegion(regionName);
     // Check if the region exists on the local cache.
     // In the current implementation of 5.1 the Server Connection is (ConnectionProxyImpl)
@@ -113,17 +106,9 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
       throw new CqException(
           "Unable to get the connection pool. The Region does not have a pool configured.");
     }
-
-    // if (proxy == null) {
-    // throw new
-    // CqException(LocalizedStrings.CqQueryImpl_UNABLE_TO_GET_THE_CONNECTIONPROXY_THE_REGION_MAY_NOT_HAVE_A_BRIDGEWRITER_OR_BRIDGECLIENT_INSTALLED_ON_IT.toLocalizedString());
-    // } else if(!proxy.getEstablishCallbackConnection()){
-    // throw new
-    // CqException(LocalizedStrings.CqQueryImpl_THE_ESTABLISHCALLBACKCONNECTION_ON_BRIDGEWRITER_CLIENT_INSTALLED_ON_REGION_0_IS_SET_TO_FALSE
-    // .toLocalizedString(regionName));
-    // }
   }
 
+  @Override
   public void close() throws CqClosedException, CqException {
     this.close(true);
   }
@@ -182,15 +167,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
       if (cqProxy == null || !sendRequestToServer || isClosed) {
         // Stat update.
         if (stateBeforeClosing == CqStateImpl.RUNNING) {
-          cqService.stats.decCqsActive();
+          cqService.stats().decCqsActive();
         } else if (stateBeforeClosing == CqStateImpl.STOPPED) {
-          cqService.stats.decCqsStopped();
+          cqService.stats().decCqsStopped();
         }
 
         // Set the state to close, and update stats
         this.cqState.setState(CqStateImpl.CLOSED);
-        cqService.stats.incCqsClosed();
-        cqService.stats.decCqsOnClient();
+        cqService.stats().incCqsClosed();
+        cqService.stats().decCqsOnClient();
         if (this.stats != null)
           this.stats.close();
       } else {
@@ -201,7 +186,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
         if (exception != null) {
           throw new CqException(
               LocalizedStrings.CqQueryImpl_FAILED_TO_CLOSE_THE_CQ_CQNAME_0_ERROR_FROM_LAST_ENDPOINT_1
-                  .toLocalizedString(new Object[] {this.cqName, exception.getLocalizedMessage()}),
+                  .toLocalizedString(this.cqName, exception.getLocalizedMessage()),
               exception.getCause());
         } else {
           throw new CqException(
@@ -261,31 +246,28 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
 
   /**
    * Clears the resource used by CQ.
-   * 
-   * @throws CqException
    */
+  @Override
   protected void cleanup() throws CqException {
     this.cqService.removeFromBaseRegionToCqNameMap(this.regionName, this.getServerCqName());
   }
 
+  @Override
   public CqAttributes getCqAttributes() {
     return cqAttributes;
   }
 
-
-
   /**
    * @return Returns the cqListeners.
    */
   public CqListener[] getCqListeners() {
-
     return cqAttributes.getCqListeners();
   }
 
-
   /**
    * Start or resume executing the query.
    */
+  @Override
   public void execute() throws CqClosedException, RegionNotFoundException, CqException {
     executeCqOnRedundantsAndPrimary(false);
   }
@@ -293,7 +275,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
   /**
    * Start or resume executing the query. Gets or updates the CQ results and returns them.
    */
-  public CqResults executeWithInitialResults()
+  @Override
+  public <E> CqResults<E> executeWithInitialResults()
       throws CqClosedException, RegionNotFoundException, CqException {
 
     synchronized (queuedEventsSynchObject) {
@@ -320,16 +303,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
     CqResults initialResults;
     try {
       initialResults = (CqResults) executeCqOnRedundantsAndPrimary(true);
-    } catch (CqClosedException e) {
-      queuedEvents = null;
-      throw e;
-    } catch (RegionNotFoundException e) {
-      queuedEvents = null;
-      throw e;
-    } catch (CqException e) {
-      queuedEvents = null;
-      throw e;
-    } catch (RuntimeException e) {
+    } catch (RegionNotFoundException|CqException|RuntimeException e) {
       queuedEvents = null;
       throw e;
     }
@@ -343,6 +317,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
         if (!this.queuedEvents.isEmpty()) {
           try {
             Runnable r = new Runnable() {
+              @Override
               public void run() {
                 Object[] eventArray = null;
                 if (CqQueryImpl.testHook != null) {
@@ -395,7 +370,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
    * @param executeWithInitialResults boolean
    * @return Object SelectResults in case of executeWithInitialResults
    */
-  public Object executeCqOnRedundantsAndPrimary(boolean executeWithInitialResults)
+  private Object executeCqOnRedundantsAndPrimary(boolean executeWithInitialResults)
       throws CqClosedException, RegionNotFoundException, CqException {
 
     Object initialResults = null;
@@ -462,7 +437,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
             String errMsg =
                 LocalizedStrings.CqQueryImpl_FAILED_TO_EXECUTE_THE_CQ_CQNAME_0_QUERY_STRING_IS_1_ERROR_FROM_LAST_SERVER_2
                     .toLocalizedString(
-                        new Object[] {this.cqName, this.queryString, ex.getLocalizedMessage()});
+                        this.cqName, this.queryString, ex.getLocalizedMessage());
             if (logger.isDebugEnabled()) {
               logger.debug(errMsg, ex);
             }
@@ -498,8 +473,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
       }
     }
     // Update CQ-base region for book keeping.
-    this.cqService.stats.incCqsActive();
-    this.cqService.stats.decCqsStopped();
+    this.cqService.stats().incCqsActive();
+    this.cqService.stats().decCqsStopped();
     return initialResults;
   }
 
@@ -509,23 +484,22 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
    * @return true if shutdown in progress else false.
    */
   private boolean shutdownInProgress() {
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    InternalCache cache = cqService.getInternalCache();
     if (cache == null || cache.isClosed()) {
       return true; // bail, things are shutting down
     }
 
-
     String reason = cqProxy.getPool().getCancelCriterion().cancelInProgress();
     if (reason != null) {
       return true;
     }
     return false;
-
   }
 
   /**
    * Stop or pause executing the query.
    */
+  @Override
   public void stop() throws CqClosedException, CqException {
     boolean isStopped = false;
     synchronized (this.cqState) {
@@ -558,8 +532,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
       if (cqProxy == null || isStopped) {
         // Change state and stats on the client side
         this.cqState.setState(CqStateImpl.STOPPED);
-        this.cqService.stats.incCqsStopped();
-        this.cqService.stats.decCqsActive();
+        this.cqService.stats().incCqsStopped();
+        this.cqService.stats().decCqsActive();
         if (logger.isDebugEnabled()) {
           logger.debug("Successfully stopped the CQ. {}", cqName);
         }
@@ -568,7 +542,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
         if (exception != null) {
           throw new CqException(
               LocalizedStrings.CqQueryImpl_FAILED_TO_STOP_THE_CQ_CQNAME_0_ERROR_FROM_LAST_SERVER_1
-                  .toLocalizedString(new Object[] {this.cqName, exception.getLocalizedMessage()}),
+                  .toLocalizedString(this.cqName, exception.getLocalizedMessage()),
               exception.getCause());
         } else {
           throw new CqException(
@@ -579,24 +553,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
     }
   }
 
+  @Override
   public CqAttributesMutator getCqAttributesMutator() {
     return (CqAttributesMutator) this.cqAttributes;
   }
 
-
-  public ConcurrentLinkedQueue<CqEventImpl> getQueuedEvents() {
+  ConcurrentLinkedQueue<CqEventImpl> getQueuedEvents() {
     return this.queuedEvents;
   }
 
-
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqQuery2#setProxyCache(org.apache.geode.cache.
-   * client.internal.ProxyCache)
-   */
   @Override
   public void setProxyCache(ProxyCache proxyCache) {
     this.proxyCache = proxyCache;
@@ -612,7 +577,6 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
 
   @Override
   public void createOn(Connection conn, boolean isDurable) {
-
     byte regionDataPolicyOrdinal = getCqBaseRegion() == null ? (byte) 0
         : getCqBaseRegion().getAttributes().getDataPolicy().ordinal;
 
@@ -620,6 +584,4 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ {
     this.cqProxy.createOn(getName(), conn, getQueryString(), state, isDurable,
         regionDataPolicyOrdinal);
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
index 22b4137..a76cb62 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java
@@ -21,11 +21,9 @@ import java.util.Set;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.StatisticsFactory;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.query.CqClosedException;
 import org.apache.geode.cache.query.CqEvent;
 import org.apache.geode.cache.query.CqException;
-import org.apache.geode.cache.query.CqExistsException;
 import org.apache.geode.cache.query.CqState;
 import org.apache.geode.cache.query.CqStatistics;
 import org.apache.geode.cache.query.Query;
@@ -38,7 +36,7 @@ import org.apache.geode.cache.query.internal.CqStateImpl;
 import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.cache.query.internal.QueryExecutionContext;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.InternalLogWriter;
@@ -58,13 +56,13 @@ public abstract class CqQueryImpl implements InternalCqQuery {
 
   protected String queryString;
 
-  protected static final Object TOKEN = new Object();
+  static final Object TOKEN = new Object();
 
-  protected LocalRegion cqBaseRegion;
+  LocalRegion cqBaseRegion;
 
   protected Query query = null;
 
-  protected InternalLogWriter securityLogWriter;
+  InternalLogWriter securityLogWriter;
 
   protected CqServiceImpl cqService;
 
@@ -72,14 +70,14 @@ public abstract class CqQueryImpl implements InternalCqQuery {
 
   protected boolean isDurable = false;
 
-  // Stats counters
-  protected CqStatisticsImpl cqStats;
+  /** Stats counters */
+  private CqStatisticsImpl cqStats;
 
   protected CqQueryVsdStats stats;
 
   protected final CqStateImpl cqState = new CqStateImpl();
 
-  protected ExecutionContext queryExecutionContext = null;
+  private ExecutionContext queryExecutionContext = null;
 
   public static TestHook testHook = null;
 
@@ -100,6 +98,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
   /**
    * returns CQ name
    */
+  @Override
   public String getName() {
     return this.cqName;
   }
@@ -109,6 +108,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
     this.cqName = cqName;
   }
 
+  @Override
   public void setCqService(CqService cqService) {
     this.cqService = (CqServiceImpl) cqService;
   }
@@ -121,25 +121,25 @@ public abstract class CqQueryImpl implements InternalCqQuery {
     return this.regionName;
   }
 
-  public void updateCqCreateStats() {
+  void updateCqCreateStats() {
     // Initialize the VSD statistics
     StatisticsFactory factory = cqService.getCache().getDistributedSystem();
     this.stats = new CqQueryVsdStats(factory, getServerCqName());
     this.cqStats = new CqStatisticsImpl(this);
 
     // Update statistics with CQ creation.
-    this.cqService.stats.incCqsStopped();
-    this.cqService.stats.incCqsCreated();
-    this.cqService.stats.incCqsOnClient();
+    this.cqService.stats().incCqsStopped();
+    this.cqService.stats().incCqsCreated();
+    this.cqService.stats().incCqsOnClient();
   }
 
   /**
    * Validates the CQ. Checks for cq constraints. Also sets the base region name.
    */
-  public void validateCq() {
-    Cache cache = cqService.getCache();
+  void validateCq() {
+    InternalCache cache = cqService.getInternalCache();
     DefaultQuery locQuery =
-        (DefaultQuery) ((GemFireCacheImpl) cache).getLocalQueryService().newQuery(this.queryString);
+        (DefaultQuery) cache.getLocalQueryService().newQuery(this.queryString);
     this.query = locQuery;
     // assert locQuery != null;
 
@@ -221,10 +221,8 @@ public abstract class CqQueryImpl implements InternalCqQuery {
 
   /**
    * Removes the CQ from CQ repository.
-   * 
-   * @throws CqException
    */
-  protected void removeFromCqMap() throws CqException {
+  void removeFromCqMap() throws CqException {
     try {
       cqService.removeCq(this.getServerCqName());
     } catch (Exception ex) {
@@ -243,6 +241,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
   /**
    * Returns the QueryString of this CQ.
    */
+  @Override
   public String getQueryString() {
     return queryString;
   }
@@ -252,23 +251,16 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @return the Query for the query string
    */
+  @Override
   public Query getQuery() {
     return query;
   }
 
-
-  /**
-   * @see org.apache.geode.cache.query.CqQuery#getStatistics()
-   */
+  @Override
   public CqStatistics getStatistics() {
     return cqStats;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#getCqBaseRegion()
-   */
   @Override
   public LocalRegion getCqBaseRegion() {
     return this.cqBaseRegion;
@@ -279,11 +271,12 @@ public abstract class CqQueryImpl implements InternalCqQuery {
   /**
    * @return Returns the Region name on which this cq is created.
    */
-  public String getBaseRegionName() {
+  String getBaseRegionName() {
 
     return this.regionName;
   }
 
+  @Override
   public abstract String getServerCqName();
 
   /**
@@ -291,15 +284,11 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @return STOPPED RUNNING or CLOSED
    */
+  @Override
   public CqState getState() {
     return this.cqState;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqQuery2#setCqState(int)
-   */
   @Override
   public void setCqState(int state) {
     if (this.isClosed()) {
@@ -309,18 +298,13 @@ public abstract class CqQueryImpl implements InternalCqQuery {
 
     synchronized (cqState) {
       if (state == CqStateImpl.RUNNING) {
-        if (this.isRunning()) {
-          // throw new
-          // IllegalStateException(LocalizedStrings.CqQueryImpl_CQ_IS_NOT_IN_RUNNING_STATE_STOP_CQ_DOES_NOT_APPLY_CQNAME_0
-          // .toLocalizedString(this.cqName));
-        }
         this.cqState.setState(CqStateImpl.RUNNING);
-        this.cqService.stats.incCqsActive();
-        this.cqService.stats.decCqsStopped();
+        this.cqService.stats().incCqsActive();
+        this.cqService.stats().decCqsStopped();
       } else if (state == CqStateImpl.STOPPED) {
         this.cqState.setState(CqStateImpl.STOPPED);
-        this.cqService.stats.incCqsStopped();
-        this.cqService.stats.decCqsActive();
+        this.cqService.stats().incCqsStopped();
+        this.cqService.stats().decCqsActive();
       } else if (state == CqStateImpl.CLOSING) {
         this.cqState.setState(state);
       }
@@ -332,7 +316,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @param cqEvent object
    */
-  public void updateStats(CqEvent cqEvent) {
+  void updateStats(CqEvent cqEvent) {
     this.stats.updateStats(cqEvent); // Stats for VSD
   }
 
@@ -341,15 +325,17 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @return true if running, false otherwise
    */
+  @Override
   public boolean isRunning() {
     return this.cqState.isRunning();
   }
 
   /**
-   * Return true if the CQ is in Sstopped state
+   * Return true if the CQ is in stopped state
    * 
    * @return true if stopped, false otherwise
    */
+  @Override
   public boolean isStopped() {
     return this.cqState.isStopped();
   }
@@ -359,6 +345,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @return true if closed, false otherwise
    */
+  @Override
   public boolean isClosed() {
     return this.cqState.isClosed();
   }
@@ -377,6 +364,7 @@ public abstract class CqQueryImpl implements InternalCqQuery {
    * 
    * @return true if durable, false otherwise
    */
+  @Override
   public boolean isDurable() {
     return this.isDurable;
   }
@@ -391,22 +379,22 @@ public abstract class CqQueryImpl implements InternalCqQuery {
     return stats;
   }
 
-  public ExecutionContext getQueryExecutionContext() {
+  ExecutionContext getQueryExecutionContext() {
     return queryExecutionContext;
   }
 
-  public void setQueryExecutionContext(ExecutionContext queryExecutionContext) {
+  private void setQueryExecutionContext(ExecutionContext queryExecutionContext) {
     this.queryExecutionContext = queryExecutionContext;
   }
 
   /** Test Hook */
   public interface TestHook {
-    public void pauseUntilReady();
+    void pauseUntilReady();
 
-    public void ready();
+    void ready();
 
-    public int numQueuedEvents();
+    int numQueuedEvents();
 
-    public void setEventCount(int count);
+    void setEventCount(int count);
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
index db90632..9cc2eea 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java
@@ -22,7 +22,7 @@ import java.util.Map;
 
 import org.apache.geode.cache.query.internal.cq.spi.CqServiceFactory;
 import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.CommandInitializer;
@@ -36,14 +36,13 @@ import org.apache.geode.internal.cache.tier.sockets.command.StopCQ;
 
 public class CqServiceFactoryImpl implements CqServiceFactory {
 
+  @Override
   public void initialize() {
-    {
-      Map<Version, Command> versions = new HashMap<Version, Command>();
-      versions.put(Version.GFE_57, ExecuteCQ.getCommand());
-      versions.put(Version.GFE_61, ExecuteCQ61.getCommand());
-      CommandInitializer.registerCommand(MessageType.EXECUTECQ_MSG_TYPE, versions);
-      CommandInitializer.registerCommand(MessageType.EXECUTECQ_WITH_IR_MSG_TYPE, versions);
-    }
+    Map<Version, Command> versions = new HashMap<>();
+    versions.put(Version.GFE_57, ExecuteCQ.getCommand());
+    versions.put(Version.GFE_61, ExecuteCQ61.getCommand());
+    CommandInitializer.registerCommand(MessageType.EXECUTECQ_MSG_TYPE, versions);
+    CommandInitializer.registerCommand(MessageType.EXECUTECQ_WITH_IR_MSG_TYPE, versions);
 
     CommandInitializer.registerCommand(MessageType.GETCQSTATS_MSG_TYPE,
         Collections.singletonMap(Version.GFE_57, GetCQStats.getCommand()));
@@ -58,7 +57,7 @@ public class CqServiceFactoryImpl implements CqServiceFactory {
   }
 
   @Override
-  public CqService create(GemFireCacheImpl cache) {
+  public CqService create(InternalCache cache) {
     return new CqServiceImpl(cache);
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
index f1ca832..d0a8176 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java
@@ -14,19 +14,63 @@
  */
 package org.apache.geode.cache.query.internal.cq;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.logging.log4j.Logger;
+
 import org.apache.geode.InvalidDeltaException;
 import org.apache.geode.StatisticsFactory;
 import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.cache.client.Pool;
-import org.apache.geode.cache.client.internal.*;
-import org.apache.geode.cache.query.*;
-import org.apache.geode.cache.query.internal.*;
+import org.apache.geode.cache.client.internal.GetEventValueOp;
+import org.apache.geode.cache.client.internal.InternalPool;
+import org.apache.geode.cache.client.internal.QueueManager;
+import org.apache.geode.cache.client.internal.ServerCQProxyImpl;
+import org.apache.geode.cache.client.internal.UserAttributes;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqClosedException;
+import org.apache.geode.cache.query.CqException;
+import org.apache.geode.cache.query.CqExistsException;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.CqQuery;
+import org.apache.geode.cache.query.CqServiceStatistics;
+import org.apache.geode.cache.query.CqStatusListener;
+import org.apache.geode.cache.query.QueryException;
+import org.apache.geode.cache.query.QueryInvalidException;
+import org.apache.geode.cache.query.RegionNotFoundException;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.internal.CompiledSelect;
+import org.apache.geode.cache.query.internal.CqQueryVsdStats;
+import org.apache.geode.cache.query.internal.CqStateImpl;
+import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.ExecutionContext;
 import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.i18n.StringId;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.FilterProfile;
+import org.apache.geode.internal.cache.FilterRoutingInfo;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
@@ -35,31 +79,19 @@ import org.apache.geode.internal.cache.tier.sockets.Part;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.logging.log4j.Logger;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
- * @since GemFire 5.5
- *
- *        Implements the CqService functionality.
- * 
- */
-/**
+ * Implements the CqService functionality.
  *
+ * @since GemFire 5.5
  */
 public final class CqServiceImpl implements CqService {
   private static final Logger logger = LogService.getLogger();
 
-  private static final Integer MESSAGE_TYPE_LOCAL_CREATE =
-      Integer.valueOf(MessageType.LOCAL_CREATE);
-  private static final Integer MESSAGE_TYPE_LOCAL_UPDATE =
-      Integer.valueOf(MessageType.LOCAL_UPDATE);
-  private static final Integer MESSAGE_TYPE_LOCAL_DESTROY =
-      Integer.valueOf(MessageType.LOCAL_DESTROY);
-  private static final Integer MESSAGE_TYPE_EXCEPTION = Integer.valueOf(MessageType.EXCEPTION);
+  private static final Integer MESSAGE_TYPE_LOCAL_CREATE = MessageType.LOCAL_CREATE;
+  private static final Integer MESSAGE_TYPE_LOCAL_UPDATE = MessageType.LOCAL_UPDATE;
+  private static final Integer MESSAGE_TYPE_LOCAL_DESTROY = MessageType.LOCAL_DESTROY;
+  private static final Integer MESSAGE_TYPE_EXCEPTION = MessageType.EXCEPTION;
 
   /**
    * System property to evaluate the query even though the initial results are not required when cq
@@ -67,25 +99,24 @@ public final class CqServiceImpl implements CqService {
    */
   public static boolean EXECUTE_QUERY_DURING_INIT = Boolean
       .valueOf(System
-          .getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.EXECUTE_QUERY_DURING_INIT", "true"))
-      .booleanValue();
+          .getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.EXECUTE_QUERY_DURING_INIT", "true"));
 
   private static final String CQ_NAME_PREFIX = "GfCq";
 
-  private final Cache cache;
+  private final InternalCache cache;
 
   /**
    * Manages cq pools to determine if a status of connect or disconnect needs to be sent out
    */
-  private final HashMap<String, Boolean> cqPoolsConnected = new HashMap<String, Boolean>();
-
+  private final HashMap<String, Boolean> cqPoolsConnected = new HashMap<>();
 
   /**
    * Manages CQ objects. uses serverCqName as key and CqQueryImpl as value
    * 
-   * @guarded.By cqQueryMapLock
+   * GuardedBy cqQueryMapLock
    */
-  private volatile HashMap<String, CqQueryImpl> cqQueryMap = new HashMap<String, CqQueryImpl>();
+  private volatile HashMap<String, CqQueryImpl> cqQueryMap = new HashMap<>();
+
   private final Object cqQueryMapLock = new Object();
 
   private volatile boolean isRunning = false;
@@ -93,36 +124,21 @@ public final class CqServiceImpl implements CqService {
   /**
    * Used by client when multiuser-authentication is true.
    */
-  private final HashMap<String, UserAttributes> cqNameToUserAttributesMap =
-      new HashMap<String, UserAttributes>();
-
-  // private boolean isServer = true;
-
-  /*
-   * // Map to manage CQ to satisfied CQ events (keys) for optimizing updates. private final HashMap
-   * cqToCqEventKeysMap = CqService.MAINTAIN_KEYS ? new HashMap() : null;
-   */
+  private final HashMap<String, UserAttributes> cqNameToUserAttributesMap = new HashMap<>();
 
   // Map to manage the similar CQs (having same query - performance optimization).
   // With query as key and Set of CQs as values.
   private final ConcurrentHashMap matchingCqMap;
 
   // CQ Service statistics
-  public final CqServiceStatisticsImpl cqServiceStats;
-  public final CqServiceVsdStats stats;
+  private final CqServiceStatisticsImpl cqServiceStats;
+  private final CqServiceVsdStats stats;
 
   // CQ identifier, also used in auto generated CQ names
   private volatile long cqId = 1;
 
-  /**
-   * Used to synchronize access to CQs in the repository
-   */
-  final Object cqSync = new Object();
-
   /* This is to manage region to CQs map, client side book keeping. */
-  private HashMap<String, ArrayList<String>> baseRegionToCqNameMap =
-      new HashMap<String, ArrayList<String>>();
-
+  private HashMap<String, ArrayList<String>> baseRegionToCqNameMap = new HashMap<>();
 
   /**
    * Access and modification to the contents of this map do not necessarily need to be lock
@@ -135,33 +151,24 @@ public final class CqServiceImpl implements CqService {
 
   /**
    * Constructor.
-   * 
-   * @param c The cache used for the service
+   *
+   * @param cache The cache used for the service
    */
-  public CqServiceImpl(final Cache c) {
-    if (c == null) {
+  public CqServiceImpl(final InternalCache cache) {
+    if (cache == null) {
       throw new IllegalStateException(LocalizedStrings.CqService_CACHE_IS_NULL.toLocalizedString());
     }
-    GemFireCacheImpl gfc = (GemFireCacheImpl) c;
-    gfc.getCancelCriterion().checkCancelInProgress(null);
-
-    this.cache = gfc;
+    cache.getCancelCriterion().checkCancelInProgress(null);
 
+    this.cache = cache;
 
     // Initialize the Map which maintains the matching cqs.
     this.matchingCqMap = new ConcurrentHashMap<String, HashSet<String>>();
 
     // Initialize the VSD statistics
-    StatisticsFactory factory = cache.getDistributedSystem();
+    StatisticsFactory factory = this.cache.getDistributedSystem();
     this.stats = new CqServiceVsdStats(factory);
     this.cqServiceStats = new CqServiceStatisticsImpl(this);
-
-    // final LoggingThreadGroup group =
-    // LoggingThreadGroup.createThreadGroup("CqExecutor Threads", logger);
-
-    // if (this.cache.getCacheServers().isEmpty()) {
-    // isServer = false;
-    // }
   }
 
   /**
@@ -171,13 +178,14 @@ public final class CqServiceImpl implements CqService {
     return this.cache;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#newCq(java.lang.String,
-   * java.lang.String, org.apache.geode.cache.query.CqAttributes,
-   * org.apache.geode.cache.client.internal.ServerCQProxy, boolean)
-   */
+  public InternalCache getInternalCache() {
+    return this.cache;
+  }
+
+  public CqServiceVsdStats stats() {
+    return this.stats;
+  }
+
   @Override
   public synchronized ClientCQ newCq(String cqName, String queryString, CqAttributes cqAttributes,
       InternalPool pool, boolean isDurable)
@@ -242,22 +250,15 @@ public final class CqServiceImpl implements CqService {
     return cQuery;
   }
 
-
   /**
    * Executes the given CqQuery, if the CqQuery for that name is not there it registers the one and
    * executes. This is called on the Server.
    * 
-   * @param cqName
-   * @param queryString
-   * @param cqState
-   * @param clientProxyId
-   * @param ccn
    * @param manageEmptyRegions whether to update the 6.1 emptyRegions map held in the CCN
    * @param regionDataPolicy the data policy of the region associated with the query. This is only
    *        needed if manageEmptyRegions is true.
    * @param emptyRegionsMap map of empty regions.
    * @throws IllegalStateException if this is called at client side.
-   * @throws CqException
    */
   @Override
   public synchronized ServerCQ executeCq(String cqName, String queryString, int cqState,
@@ -271,7 +272,7 @@ public final class CqServiceImpl implements CqService {
     }
 
     String serverCqName = constructServerCqName(cqName, clientProxyId);
-    ServerCQImpl cQuery = null;
+    ServerCQImpl cQuery;
 
     // If this CQ is not yet registered in Server, register CQ.
     if (!isCqExists(serverCqName)) {
@@ -292,7 +293,6 @@ public final class CqServiceImpl implements CqService {
         logger.info(LocalizedMessage.create(
             LocalizedStrings.CqService_EXCEPTION_WHILE_REGISTERING_CQ_ON_SERVER_CQNAME___0,
             cQuery.getName()));
-        cQuery = null;
         throw cqe;
       }
 
@@ -308,6 +308,7 @@ public final class CqServiceImpl implements CqService {
     return cQuery;
   }
 
+  @Override
   public void resumeCQ(int cqState, ServerCQ cQuery) {
     // Initialize the state of CQ.
     if (((CqStateImpl) cQuery.getState()).getState() != cqState) {
@@ -324,25 +325,10 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-  /*
-   * public void addToCqEventKeysMap(CqQuery cq){ if (cqToCqEventKeysMap != null) { synchronized
-   * (cqToCqEventKeysMap){ String serverCqName = ((CqQueryImpl)cq).getServerCqName(); if
-   * (!cqToCqEventKeysMap.containsKey(serverCqName)){ cqToCqEventKeysMap.put(serverCqName, new
-   * HashSet()); if (_logger.isDebugEnabled()) {
-   * _logger.debug("CQ Event key maintenance for CQ, CqName: " + serverCqName + " is Enabled." +
-   * " key maintenance map size is: " + cqToCqEventKeysMap.size()); } } } // synchronized } }
-   */
-
-  public boolean hasCq() {
-    HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
-    return (cqMap.size() > 0);
-  }
-
-
   /**
    * Adds the given CQ and cqQuery object into the CQ map.
    */
-  public void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException {
+  void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException {
     // On server side cqName will be server side cqName.
     String sCqName = cq.getServerCqName();
     if (logger.isDebugEnabled()) {
@@ -355,7 +341,7 @@ public final class CqServiceImpl implements CqService {
               .toLocalizedString(sCqName));
     }
     synchronized (cqQueryMapLock) {
-      HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<String, CqQueryImpl>(cqQueryMap);
+      HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<>(cqQueryMap);
       try {
         tmpCqQueryMap.put(sCqName, cq);
       } catch (Exception ex) {
@@ -377,66 +363,34 @@ public final class CqServiceImpl implements CqService {
   /**
    * Removes given CQ from the cqMap..
    */
-  public void removeCq(String cqName) {
+  void removeCq(String cqName) {
     // On server side cqName will be server side cqName.
     synchronized (cqQueryMapLock) {
-      HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<String, CqQueryImpl>(cqQueryMap);
+      HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<>(cqQueryMap);
       tmpCqQueryMap.remove(cqName);
       this.cqNameToUserAttributesMap.remove(cqName);
       cqQueryMap = tmpCqQueryMap;
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#getClientCqFromServer(org.apache.geode.
-   * internal.cache.tier.sockets.ClientProxyMembershipID, java.lang.String)
-   */
   @Override
   public CqQuery getClientCqFromServer(ClientProxyMembershipID clientProxyId, String clientCqName) {
     // On server side cqName will be server side cqName.
     HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
-    return (CqQuery) cqMap.get(this.constructServerCqName(clientCqName, clientProxyId));
+    return cqMap.get(this.constructServerCqName(clientCqName, clientProxyId));
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#getCq(java.lang.String)
-   */
   @Override
   public InternalCqQuery getCq(String cqName) {
     // On server side cqName will be server side cqName.
-    return (InternalCqQuery) cqQueryMap.get(cqName);
+    return cqQueryMap.get(cqName);
   }
 
-  /**
-   * Clears the CQ Query Map.
-   */
-  public void clearCqQueryMap() {
-    // On server side cqName will be server side cqName.
-    synchronized (cqQueryMapLock) {
-      cqQueryMap = new HashMap<String, CqQueryImpl>();
-    }
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#getAllCqs()
-   */
   @Override
   public Collection<? extends InternalCqQuery> getAllCqs() {
     return cqQueryMap.values();
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#getAllCqs(java.lang.String)
-   */
   @Override
   public Collection<? extends InternalCqQuery> getAllCqs(final String regionName)
       throws CqException {
@@ -445,7 +399,7 @@ public final class CqServiceImpl implements CqService {
           LocalizedStrings.CqService_NULL_ARGUMENT_0.toLocalizedString("regionName"));
     }
 
-    String[] cqNames = null;
+    String[] cqNames;
 
     synchronized (this.baseRegionToCqNameMap) {
       ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
@@ -456,7 +410,7 @@ public final class CqServiceImpl implements CqService {
       cqs.toArray(cqNames);
     }
 
-    ArrayList<InternalCqQuery> cQueryList = new ArrayList<InternalCqQuery>();
+    ArrayList<InternalCqQuery> cQueryList = new ArrayList<>();
     for (int cqCnt = 0; cqCnt < cqNames.length; cqCnt++) {
       InternalCqQuery cq = getCq(cqNames[cqCnt]);
       if (cq != null) {
@@ -467,34 +421,16 @@ public final class CqServiceImpl implements CqService {
     return cQueryList;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#executeAllClientCqs()
-   */
   @Override
   public synchronized void executeAllClientCqs() throws CqException {
     executeCqs(this.getAllCqs());
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#executeAllRegionCqs(java.lang.String)
-   */
   @Override
   public synchronized void executeAllRegionCqs(final String regionName) throws CqException {
     executeCqs(getAllCqs(regionName));
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#executeCqs(org.apache.geode.cache.query
-   * .CqQuery[])
-   */
   @Override
   public synchronized void executeCqs(Collection<? extends InternalCqQuery> cqs)
       throws CqException {
@@ -503,53 +439,31 @@ public final class CqServiceImpl implements CqService {
     }
     String cqName = null;
     for (InternalCqQuery internalCq : cqs) {
-      CqQuery cq = (CqQuery) internalCq;
+      CqQuery cq = internalCq;
       if (!cq.isClosed() && cq.isStopped()) {
         try {
           cqName = cq.getName();
           cq.execute();
-        } catch (QueryException qe) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName,
-                qe.getMessage());
-          }
-        } catch (CqClosedException cce) {
+        } catch (QueryException|CqClosedException e) {
           if (logger.isDebugEnabled()) {
             logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName,
-                cce.getMessage());
+                e.getMessage());
           }
         }
       }
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#stopAllClientCqs()
-   */
   @Override
   public synchronized void stopAllClientCqs() throws CqException {
     stopCqs(this.getAllCqs());
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#stopAllRegionCqs(java.lang.String)
-   */
   @Override
   public synchronized void stopAllRegionCqs(final String regionName) throws CqException {
     stopCqs(this.getAllCqs(regionName));
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#stopCqs(org.apache.geode.cache.query.
-   * CqQuery[])
-   */
   @Override
   public synchronized void stopCqs(Collection<? extends InternalCqQuery> cqs) throws CqException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -567,29 +481,20 @@ public final class CqServiceImpl implements CqService {
 
     String cqName = null;
     for (InternalCqQuery internalCqQuery : cqs) {
-      CqQuery cq = (CqQuery) internalCqQuery;
+      CqQuery cq = internalCqQuery;
       if (!cq.isClosed() && cq.isRunning()) {
         try {
           cqName = cq.getName();
           cq.stop();
-        } catch (QueryException qe) {
-          if (isDebugEnabled) {
-            logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, qe.getMessage());
-          }
-        } catch (CqClosedException cce) {
+        } catch (QueryException|CqClosedException e) {
           if (isDebugEnabled) {
-            logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, cce.getMessage());
+            logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, e.getMessage());
           }
         }
       }
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#closeCqs(java.lang.String)
-   */
   @Override
   public void closeCqs(final String regionName) throws CqException {
     Collection<? extends InternalCqQuery> cqs = this.getAllCqs(regionName);
@@ -603,8 +508,8 @@ public final class CqServiceImpl implements CqService {
             // invoked on the server
             cq.close(false);
           } else {
-            // @todo grid: if regionName has a pool check its keepAlive
-            boolean keepAlive = ((GemFireCacheImpl) this.cache).keepDurableSubscriptionsAlive();
+            // TODO: grid: if regionName has a pool check its keepAlive
+            boolean keepAlive = this.cache.keepDurableSubscriptionsAlive();
             if (cq.isDurable() && keepAlive) {
               logger.warn(LocalizedMessage.create(
                   LocalizedStrings.CqService_NOT_SENDING_CQ_CLOSE_TO_THE_SERVER_AS_IT_IS_A_DURABLE_CQ));
@@ -614,14 +519,9 @@ public final class CqServiceImpl implements CqService {
             }
           }
 
-        } catch (QueryException qe) {
+        } catch (QueryException|CqClosedException e) {
           if (logger.isDebugEnabled()) {
-            logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, qe.getMessage());
-          }
-        } catch (CqClosedException cce) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName,
-                cce.getMessage());
+            logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, e.getMessage());
           }
         }
       }
@@ -630,10 +530,6 @@ public final class CqServiceImpl implements CqService {
 
   /**
    * Called directly on server side.
-   * 
-   * @param cqName
-   * @param clientId
-   * @throws CqException
    */
   @Override
   public void stopCq(String cqName, ClientProxyMembershipID clientId) throws CqException {
@@ -650,8 +546,6 @@ public final class CqServiceImpl implements CqService {
     try {
       HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
       if (!cqMap.containsKey(serverCqName)) {
-        // throw new
-        // CqException(LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_STOP_THE_SPECIFIED_CQ_0.toLocalizedString(serverCqName));
         /*
          * gregp 052808: We should silently fail here instead of throwing error. This is to deal
          * with races in recovery
@@ -689,15 +583,8 @@ public final class CqServiceImpl implements CqService {
     }
     // Send stop message to peers.
     cQuery.getCqBaseRegion().getFilterProfile().stopCq(cQuery);
-
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#closeCq(java.lang.String,
-   * org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID)
-   */
   @Override
   public void closeCq(String cqName, ClientProxyMembershipID clientProxyId) throws CqException {
     String serverCqName = cqName;
@@ -713,9 +600,6 @@ public final class CqServiceImpl implements CqService {
     try {
       HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
       if (!cqMap.containsKey(serverCqName)) {
-        // throw new
-        // CqException(LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_CLOSE_THE_SPECIFIED_CQ_0
-        // .toLocalizedString(serverCqName));
         /*
          * gregp 052808: We should silently fail here instead of throwing error. This is to deal
          * with races in recovery
@@ -791,12 +675,6 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#closeAllCqs(boolean)
-   */
   @Override
   public void closeAllCqs(boolean clientInitiated) {
     closeAllCqs(clientInitiated, getAllCqs());
@@ -807,21 +685,13 @@ public final class CqServiceImpl implements CqService {
    * CqQuerys created by other VMs are unaffected.
    */
   private void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs) {
-    closeAllCqs(clientInitiated, cqs,
-        ((GemFireCacheImpl) this.cache).keepDurableSubscriptionsAlive());
+    closeAllCqs(clientInitiated, cqs, this.cache.keepDurableSubscriptionsAlive());
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#closeAllCqs(boolean,
-   * org.apache.geode.cache.query.CqQuery[], boolean)
-   */
   @Override
   public void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs,
       boolean keepAlive) {
 
-    // CqQuery[] cqs = getAllCqs();
     if (cqs != null) {
       String cqName = null;
       if (logger.isDebugEnabled()) {
@@ -830,7 +700,6 @@ public final class CqServiceImpl implements CqService {
       for (InternalCqQuery cQuery : cqs) {
         try {
           cqName = cQuery.getName();
-          // boolean keepAlive = ((GemFireCache)this.cache).keepDurableSubscriptionsAlive();
 
           if (isServer()) {
             cQuery.close(false);
@@ -847,47 +716,26 @@ public final class CqServiceImpl implements CqService {
               }
             }
           }
-        } catch (QueryException cqe) {
+        } catch (QueryException|CqClosedException e) {
           if (!isRunning()) {
             // Not cache shutdown
             logger
                 .warn(LocalizedMessage.create(LocalizedStrings.CqService_FAILED_TO_CLOSE_CQ__0___1,
-                    new Object[] {cqName, cqe.getMessage()}));
+                    new Object[] {cqName, e.getMessage()}));
           }
           if (logger.isDebugEnabled()) {
-            logger.debug(cqe.getMessage(), cqe);
-          }
-        } catch (CqClosedException cqe) {
-          if (!isRunning()) {
-            // Not cache shutdown
-            logger
-                .warn(LocalizedMessage.create(LocalizedStrings.CqService_FAILED_TO_CLOSE_CQ__0___1,
-                    new Object[] {cqName, cqe.getMessage()}));
-          }
-          if (logger.isDebugEnabled()) {
-            logger.debug(cqe.getMessage(), cqe);
+            logger.debug(e.getMessage(), e);
           }
         }
       }
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#getCqStatistics()
-   */
   @Override
   public CqServiceStatistics getCqStatistics() {
     return cqServiceStats;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#closeClientCqs(org.apache.geode.
-   * internal.cache.tier.sockets.ClientProxyMembershipID)
-   */
   @Override
   public void closeClientCqs(ClientProxyMembershipID clientProxyId) throws CqException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -899,30 +747,19 @@ public final class CqServiceImpl implements CqService {
       CqQueryImpl cQuery = (CqQueryImpl) cq;
       try {
         cQuery.close(false);
-      } catch (QueryException qe) {
+      } catch (QueryException|CqClosedException e) {
         if (isDebugEnabled) {
           logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
-              qe.getMessage());
-        }
-      } catch (CqClosedException cce) {
-        if (isDebugEnabled) {
-          logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
-              cce.getMessage());
+              e.getMessage());
         }
       }
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.geode.cache.query.internal.InternalCqService#getAllClientCqs(org.apache.geode.
-   * internal.cache.tier.sockets.ClientProxyMembershipID)
-   */
   @Override
   public List<ServerCQ> getAllClientCqs(ClientProxyMembershipID clientProxyId) {
     Collection<? extends InternalCqQuery> cqs = getAllCqs();
-    ArrayList<ServerCQ> clientCqs = new ArrayList<ServerCQ>();
+    ArrayList<ServerCQ> clientCqs = new ArrayList<>();
 
     for (InternalCqQuery cq : cqs) {
       ServerCQImpl cQuery = (ServerCQImpl) cq;
@@ -934,23 +771,16 @@ public final class CqServiceImpl implements CqService {
     return clientCqs;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#getAllDurableClientCqs(org.apache.geode
-   * .internal.cache.tier.sockets.ClientProxyMembershipID)
-   */
   @Override
   public List<String> getAllDurableClientCqs(ClientProxyMembershipID clientProxyId)
       throws CqException {
     if (clientProxyId == null) {
       throw new CqException(
           LocalizedStrings.CqService_UNABLE_TO_RETRIEVE_DURABLE_CQS_FOR_CLIENT_PROXY_ID
-              .toLocalizedString(clientProxyId));
+              .toLocalizedString(null));
     }
     List<ServerCQ> cqs = getAllClientCqs(clientProxyId);
-    ArrayList<String> durableClientCqs = new ArrayList<String>();
+    ArrayList<String> durableClientCqs = new ArrayList<>();
 
     for (ServerCQ cq : cqs) {
       ServerCQImpl cQuery = (ServerCQImpl) cq;
@@ -966,9 +796,6 @@ public final class CqServiceImpl implements CqService {
 
   /**
    * Server side method. Closes non-durable CQs for the given client proxy id.
-   * 
-   * @param clientProxyId
-   * @throws CqException
    */
   @Override
   public void closeNonDurableClientCqs(ClientProxyMembershipID clientProxyId) throws CqException {
@@ -983,15 +810,10 @@ public final class CqServiceImpl implements CqService {
         if (!cQuery.isDurable()) {
           cQuery.close(false);
         }
-      } catch (QueryException qe) {
-        if (isDebugEnabled) {
-          logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
-              qe.getMessage());
-        }
-      } catch (CqClosedException cce) {
+      } catch (QueryException|CqClosedException e) {
         if (isDebugEnabled) {
           logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
-              cce.getMessage());
+              e.getMessage());
         }
       }
     }
@@ -1028,6 +850,7 @@ public final class CqServiceImpl implements CqService {
     return this.isRunning;
   }
 
+  @Override
   public void start() {
     this.isRunning = true;
   }
@@ -1035,9 +858,10 @@ public final class CqServiceImpl implements CqService {
   /**
    * @return Returns the serverCqName.
    */
+  @Override
   public String constructServerCqName(String cqName, ClientProxyMembershipID clientProxyId) {
     ConcurrentHashMap<ClientProxyMembershipID, String> cache = serverCqNameCache
-        .computeIfAbsent(cqName, key -> new ConcurrentHashMap<ClientProxyMembershipID, String>());
+        .computeIfAbsent(cqName, key -> new ConcurrentHashMap<>());
 
     String cName = cache.get(clientProxyId);
     if (null == cName) {
@@ -1065,7 +889,7 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-  /*
+  /**
    * Checks if CQ with the given name already exists.
    * 
    * @param cqName name of the CQ.
@@ -1073,17 +897,15 @@ public final class CqServiceImpl implements CqService {
    * @return true if exists else false.
    */
   private synchronized boolean isCqExists(String cqName) {
-    boolean status = false;
     HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
-    status = cqMap.containsKey(cqName);
-    return status;
+    return cqMap.containsKey(cqName);
   }
 
-  /*
+  /**
    * Generates a name for CQ. Checks if CQ with that name already exists if so generates a new
    * cqName.
    */
-  public synchronized String generateCqName() {
+  private synchronized String generateCqName() {
     while (true) {
       String cqName = CQ_NAME_PREFIX + (cqId++);
       if (!isCqExists(cqName)) {
@@ -1092,18 +914,9 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#dispatchCqListeners(java.util.HashMap,
-   * int, java.lang.Object, java.lang.Object, byte[],
-   * org.apache.geode.cache.client.internal.QueueManager, org.apache.geode.internal.cache.EventID)
-   */
   @Override
   public void dispatchCqListeners(HashMap<String, Integer> cqs, int messageType, Object key,
       Object value, byte[] delta, QueueManager qManager, EventID eventId) {
-    ClientCQImpl cQuery = null;
     Object[] fullValue = new Object[1];
     Iterator<Map.Entry<String, Integer>> iter = cqs.entrySet().iterator();
     String cqName = null;
@@ -1112,7 +925,7 @@ public final class CqServiceImpl implements CqService {
       try {
         Map.Entry<String, Integer> entry = iter.next();
         cqName = entry.getKey();
-        cQuery = (ClientCQImpl) this.getCq(cqName);
+        ClientCQImpl cQuery = (ClientCQImpl) this.getCq(cqName);
 
         if (cQuery == null || (!cQuery.isRunning() && cQuery.getQueuedEvents() == null)) {
           if (isDebugEnabled) {
@@ -1122,7 +935,7 @@ public final class CqServiceImpl implements CqService {
           continue;
         }
 
-        Integer cqOp = (Integer) entry.getValue();
+        Integer cqOp = entry.getValue();
 
         // If Region destroy event, close the cq.
         if (cqOp.intValue() == MessageType.DESTROY_REGION) {
@@ -1136,8 +949,7 @@ public final class CqServiceImpl implements CqService {
         }
 
         // Construct CqEvent.
-        CqEventImpl cqEvent = null;
-        cqEvent = new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp.intValue()),
+        CqEventImpl cqEvent = new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp),
             key, value, delta, qManager, eventId);
 
         // Update statistics
@@ -1181,11 +993,11 @@ public final class CqServiceImpl implements CqService {
     } // iteration.
   }
 
-  public void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent) {
+  void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent) {
     invokeListeners(cqName, cQuery, cqEvent, null);
   }
 
-  public void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent,
+  private void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent,
       Object[] fullValue) {
     if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
       return;
@@ -1217,8 +1029,8 @@ public final class CqServiceImpl implements CqService {
             }
             Part result = (Part) GetEventValueOp
                 .executeOnPrimary(cqEvent.getQueueManager().getPool(), cqEvent.getEventID(), null);
-            Object newVal = null;
-            if (result == null || (newVal = result.getObject()) == null) {
+            Object newVal = result.getObject();
+            if (result == null || newVal == null) {
               if (!cache.getCancelCriterion().isCancelInProgress()) {
                 Exception ex =
                     new Exception("Failed to retrieve full value from server for eventID "
@@ -1231,7 +1043,7 @@ public final class CqServiceImpl implements CqService {
                 }
               }
             } else {
-              ((GemFireCacheImpl) this.cache).getCachePerfStats().incDeltaFullValuesRequested();
+              this.cache.getCachePerfStats().incDeltaFullValuesRequested();
               cqEvent = new CqEventImpl(cQuery, cqEvent.getBaseOperation(),
                   cqEvent.getQueryOperation(), cqEvent.getKey(), newVal, cqEvent.getDeltaValue(),
                   cqEvent.getQueueManager(), cqEvent.getEventID());
@@ -1278,7 +1090,7 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-  public void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) {
+  private void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) {
     if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
       return;
     }
@@ -1335,12 +1147,8 @@ public final class CqServiceImpl implements CqService {
     }
   }
 
-
   /**
    * Returns the Operation for the given EnumListenerEvent type.
-   * 
-   * @param eventType
-   * @return Operation
    */
   private Operation getOperation(int eventType) {
     Operation op = null;
@@ -1372,15 +1180,6 @@ public final class CqServiceImpl implements CqService {
     return op;
   }
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.geode.cache.query.internal.InternalCqService#processEvents(org.apache.geode.cache.
-   * CacheEvent, org.apache.geode.distributed.internal.DistributionAdvisor.Profile,
-   * org.apache.geode.distributed.internal.DistributionAdvisor.Profile[],
-   * org.apache.geode.internal.cache.FilterRoutingInfo)
-   */
   @Override
   public void processEvents(CacheEvent event, Profile localProfile, Profile[] profiles,
       FilterRoutingInfo frInfo) throws CqException {
@@ -1421,7 +1220,7 @@ public final class CqServiceImpl implements CqService {
         continue;
       }
       Map cqs = pf.getCqMap();
-      HashMap<Long, Integer> cqInfo = new HashMap<Long, Integer>();
+      HashMap<Long, Integer> cqInfo = new HashMap<>();
       Iterator cqIter = cqs.entrySet().iterator();
       while (cqIter.hasNext()) {
         Map.Entry cqEntry = (Map.Entry) cqIter.next();
@@ -1454,10 +1253,10 @@ public final class CqServiceImpl implements CqService {
   private void processEntryEvent(CacheEvent event, Profile localProfile, Profile[] profiles,
       FilterRoutingInfo frInfo) throws CqException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    HashSet<Object> cqUnfilteredEventsSet_newValue = new HashSet<Object>();
-    HashSet<Object> cqUnfilteredEventsSet_oldValue = new HashSet<Object>();
-    boolean b_cqResults_newValue = false;
-    boolean b_cqResults_oldValue = false;
+    HashSet<Object> cqUnfilteredEventsSet_newValue = new HashSet<>();
+    HashSet<Object> cqUnfilteredEventsSet_oldValue = new HashSet<>();
+    boolean b_cqResults_newValue;
+    boolean b_cqResults_oldValue;
     boolean queryOldValue;
     EntryEvent entryEvent = (EntryEvent) event;
     Object eventKey = entryEvent.getKey();
@@ -1472,8 +1271,8 @@ public final class CqServiceImpl implements CqService {
         || event.getOperation().isDestroy() || event.getOperation().isInvalidate()
         || (event.getOperation().isCreate() && isDupEvent));
 
-    HashMap<String, Integer> matchedCqs = new HashMap<String, Integer>();
-    long executionStartTime = 0;
+    HashMap<String, Integer> matchedCqs = new HashMap<>();
+    long executionStartTime;
     for (int i = -1; i < profiles.length; i++) {
       CacheProfile cf;
       if (i < 0) {
@@ -1498,7 +1297,6 @@ public final class CqServiceImpl implements CqService {
         continue;
       }
 
-
       // Get new value. If its not retrieved.
       if (cqUnfilteredEventsSet_newValue.isEmpty()
           && (event.getOperation().isCreate() || event.getOperation().isUpdate())) {
@@ -1509,7 +1307,7 @@ public final class CqServiceImpl implements CqService {
         }
       }
 
-      HashMap<Long, Integer> cqInfo = new HashMap<Long, Integer>();
+      HashMap<Long, Integer> cqInfo = new HashMap<>();
       Iterator cqIter = cqs.entrySet().iterator();
 
       while (cqIter.hasNext()) {
@@ -1546,7 +1344,6 @@ public final class CqServiceImpl implements CqService {
           }
         } else {
           boolean error = false;
-          // synchronized (cQuery)
           {
             try {
               synchronized (cQuery) {
@@ -1644,7 +1441,7 @@ public final class CqServiceImpl implements CqService {
                 cQuery.markAsDestroyedInCqResultKeys(eventKey);
               }
             }
-          } // end synchronized(cQuery)
+          }
 
           // Get the matching CQs if any.
           // synchronized (this.matchingCqMap){
@@ -1663,7 +1460,6 @@ public final class CqServiceImpl implements CqService {
               }
             }
           }
-          // }
         }
 
         if (cqEvent != null && cQuery.isRunning()) {
@@ -1694,153 +1490,35 @@ public final class CqServiceImpl implements CqService {
     } // iteration over Profiles.
   }
 
-
-  /*
-   * public void processEvents (EnumListenerEvent operation, CacheEvent event, ClientUpdateMessage
-   * clientMessage, CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds) throws CqException
-   * {
-   * 
-   * //Is this a region event or an entry event if (event instanceof RegionEvent){
-   * processRegionEvent(operation, event, clientMessage, clientIds); } else { processEntryEvent
-   * (operation, event, clientMessage, clientIds); }
-   * 
-   * }
-   * 
-   * private void processRegionEvent(EnumListenerEvent operation, CacheEvent event,
-   * ClientUpdateMessage clientMessage, CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds)
-   * throws CqException {
-   * 
-   * if (logger.isDebugEnabled()) { logger.debug("Processing region event for region " +
-   * ((LocalRegion)(event.getRegion())).getName()); } HashMap filteredCqs = new HashMap(); Integer
-   * cqRegionEvent = generateCqRegionEvent(operation); Iterator it =
-   * clientIds.entrySet().iterator(); while (it.hasNext()) { Map.Entry me = (Map.Entry)it.next();
-   * ClientProxyMembershipID clientId = (ClientProxyMembershipID)me.getKey(); CM cqsToBooleans =
-   * (CM)me.getValue(); if (cqsToBooleans == null) { continue; } Set<CqQuery> cqs =
-   * cqsToBooleans.keySet(); if (cqs.isEmpty()) { continue; } filteredCqs.clear(); Iterator cqIt =
-   * cqs.iterator(); while (cqIt.hasNext()) { CqQueryImpl cQuery = (CqQueryImpl)cqIt.next(); if
-   * (operation == EnumListenerEvent.AFTER_REGION_DESTROY) { try { if (logger.isDebugEnabled()){
-   * logger.debug("Closing CQ on region destroy event. CqName :" + cQuery.getName()); }
-   * cQuery.close(false); } catch (Exception ex) {
-   * logger.debug("Failed to Close CQ on region destroy. CqName :" + cQuery.getName(), ex); }
-   * 
-   * } filteredCqs.put(cQuery.cqName, cqRegionEvent);
-   * cQuery.getVsdStats().updateStats(cqRegionEvent);
-   * 
-   * } if (!filteredCqs.isEmpty()){ ((ClientUpdateMessageImpl)clientMessage).addClientCqs( clientId,
-   * filteredCqs); }
-   * 
-   * }
-   * 
-   * }
-   * 
-   * private void processEntryEvent(EnumListenerEvent operation, CacheEvent event,
-   * ClientUpdateMessage clientMessage, CM<ClientProxyMembershipID, CM<CqQuery, Boolean>> clientIds)
-   * throws CqException { HashSet cqUnfilteredEventsSet_newValue = new HashSet(); HashSet
-   * cqUnfilteredEventsSet_oldValue = new HashSet(); boolean b_cqResults_newValue = false; boolean
-   * b_cqResults_oldValue = false; EntryEvent entryEvent = (EntryEvent)event; Object eventKey =
-   * entryEvent.getKey(); if (operation == EnumListenerEvent.AFTER_CREATE || operation ==
-   * EnumListenerEvent.AFTER_UPDATE) { if (entryEvent.getNewValue() != null) { //We have a new value
-   * to run the query on cqUnfilteredEventsSet_newValue.clear();
-   * cqUnfilteredEventsSet_newValue.add(entryEvent.getNewValue()); } }
-   * 
-   * HashMap matchedCqs = new HashMap(); long executionStartTime = 0; Iterator it =
-   * clientIds.entrySet().iterator(); while (it.hasNext()) { Map.Entry me = (Map.Entry)it.next();
-   * ClientProxyMembershipID clientId = (ClientProxyMembershipID)me.getKey(); if
-   * (logger.isDebugEnabled()) { logger.debug("Processing event for CQ filter, ClientId : " +
-   * clientId); } CM cqsToBooleans = (CM)me.getValue(); if (cqsToBooleans == null) { continue; }
-   * Set<CqQuery> cqs = cqsToBooleans.keySet(); if (cqs.isEmpty()) { continue; } HashMap filteredCqs
-   * = new HashMap(); Iterator cqIt = cqs.iterator(); while (cqIt.hasNext()) { CqQueryImpl cQuery =
-   * (CqQueryImpl)cqIt.next(); b_cqResults_newValue = false; b_cqResults_oldValue = false; if
-   * (cQuery == null || !(cQuery.isRunning())){ continue; } String cqName =
-   * cQuery.getServerCqName(); Integer cqEvent = null; if (matchedCqs.containsKey(cqName)) { if
-   * (logger.isDebugEnabled()){ logger.
-   * debug("Similar cq/query is already processed, getting the cq event-type from the matched cq.");
-   * } cqEvent = (Integer)matchedCqs.get(cqName); } else { boolean error = false; boolean
-   * hasSeenEvent = false; HashSet cqEventKeys = null; synchronized (cQuery) { try { // Apply query
-   * on new value. if (!cqUnfilteredEventsSet_newValue.isEmpty()) { executionStartTime =
-   * this.stats.startCqQueryExecution(); b_cqResults_newValue = evaluateQuery(cQuery, new Object[]
-   * {cqUnfilteredEventsSet_newValue}); this.stats.endCqQueryExecution(executionStartTime); } //
-   * Check if old value is cached, if not apply query on old value. if (cqToCqEventKeysMap != null)
-   * { synchronized (cqToCqEventKeysMap) { if ((cqEventKeys =
-   * (HashSet)cqToCqEventKeysMap.get(cqName)) != null) { hasSeenEvent =
-   * cqEventKeys.contains(eventKey); } } } if (!hasSeenEvent) { // get the oldValue. // In case of
-   * Update, destroy and invalidate. if (operation == EnumListenerEvent.AFTER_UPDATE || operation ==
-   * EnumListenerEvent.AFTER_DESTROY || operation == EnumListenerEvent.AFTER_INVALIDATE) { if
-   * (entryEvent.getOldValue() != null) { cqUnfilteredEventsSet_oldValue.clear();
-   * cqUnfilteredEventsSet_oldValue.add(entryEvent.getOldValue()); // Apply query on old value.
-   * executionStartTime = this.stats.startCqQueryExecution(); b_cqResults_oldValue =
-   * evaluateQuery(cQuery, new Object[] {cqUnfilteredEventsSet_oldValue});
-   * this.stats.endCqQueryExecution(executionStartTime); } } } } catch (Exception ex) { //Any
-   * exception in running the query // should be caught here and buried //because this code is
-   * running inline with the //message processing code and we don't want to //kill that thread error
-   * = true; logger.info( LocalizedStrings.
-   * CqService_ERROR_WHILE_PROCESSING_CQ_ON_THE_EVENT_KEY_0_CQNAME_1_CLIENTID_2_ERROR_3, new
-   * Object[] { ((EntryEvent)event).getKey(), cQuery.getName(), clientId,
-   * ex.getLocalizedMessage()}); }
-   * 
-   * if (error) { cqEvent = Integer.valueOf(MessageType.EXCEPTION); } else { if
-   * (b_cqResults_newValue) { if (hasSeenEvent || b_cqResults_oldValue) { cqEvent =
-   * Integer.valueOf(MessageType.LOCAL_UPDATE); } else { cqEvent =
-   * Integer.valueOf(MessageType.LOCAL_CREATE); } // If its create and caching is enabled, cache the
-   * key for this CQ. if (!hasSeenEvent && cqEventKeys != null) { cqEventKeys.add(eventKey); } }
-   * else if (hasSeenEvent || (b_cqResults_oldValue)) { // Base invalidate operation is treated as
-   * destroy. // When the invalidate comes through, the entry will no longer satisfy // the query
-   * and will need to be deleted. cqEvent = Integer.valueOf(MessageType.LOCAL_DESTROY); // If
-   * caching is enabled, remove this event's key from the cache. if (hasSeenEvent && cqEventKeys !=
-   * null) { cqEventKeys.remove(eventKey); } } }
-   * 
-   * } //end synchronized(cQuery)
-   * 
-   * // Get the matching CQs if any. synchronized (this.matchingCqMap){ String query =
-   * cQuery.getQueryString(); ArrayList matchingCqs = (ArrayList)matchingCqMap.get(query); if
-   * (matchingCqs != null) { Iterator iter = matchingCqs.iterator(); while (iter.hasNext()) { String
-   * matchingCqName = (String)iter.next(); if (!matchingCqName.equals(cqName)){
-   * matchedCqs.put(matchingCqName, cqEvent); } } } }
-   * 
-   * }
-   * 
-   * if (cqEvent != null){ if (logger.isDebugEnabled()) {
-   * logger.debug("Event is added for the CQ, CqName (clientside): " + cQuery.cqName +
-   * " With CQ Op : " + cqEvent + " for Client : " + clientId); } filteredCqs.put(cQuery.cqName,
-   * cqEvent); cQuery.getVsdStats().updateStats(cqEvent); }
-   * 
-   * } // iteration over cqsToBooleans.keySet() if (!filteredCqs.isEmpty()){
-   * logger.debug("Adding event map for client : "+clientId +
-   * " with event map size : "+filteredCqs.size());
-   * ((ClientUpdateMessageImpl)clientMessage).addClientCqs(clientId, filteredCqs); } } // iteration
-   * over clientIds.entrySet() }
-   */
-
   private Integer generateCqRegionEvent(CacheEvent event) {
     Integer cqEvent = null;
     if (event.getOperation().isRegionDestroy()) {
-      cqEvent = Integer.valueOf(MessageType.DESTROY_REGION);
+      cqEvent = MessageType.DESTROY_REGION;
     } else if (event.getOperation().isRegionInvalidate()) {
-      cqEvent = Integer.valueOf(MessageType.INVALIDATE_REGION);
+      cqEvent = MessageType.INVALIDATE_REGION;
     } else if (event.getOperation().isClear()) {
-      cqEvent = Integer.valueOf(MessageType.CLEAR_REGION);
+      cqEvent = MessageType.CLEAR_REGION;
     }
     return cqEvent;
   }
 
-
   /**
    * Manages the CQs created for the base region. This is managed here, instead of on the base
    * region; since the cq could be created on the base region, before base region is created (using
    * newCq()).
    */
-  public void addToBaseRegionToCqNameMap(String regionName, String cqName) {
+  private void addToBaseRegionToCqNameMap(String regionName, String cqName) {
     synchronized (this.baseRegionToCqNameMap) {
       ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
       if (cqs == null) {
-        cqs = new ArrayList<String>();
+        cqs = new ArrayList<>();
       }
       cqs.add(cqName);
       this.baseRegionToCqNameMap.put(regionName, cqs);
     }
   }
 
-  public void removeFromBaseRegionToCqNameMap(String regionName, String cqName) {
+  void removeFromBaseRegionToCqNameMap(String regionName, String cqName) {
     synchronized (this.baseRegionToCqNameMap) {
       ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
       if (cqs != null) {
@@ -1864,37 +1542,12 @@ public final class CqServiceImpl implements CqService {
   }
 
   /**
-   * Removes this CQ from CQ event Cache map. This disables the caching events for this CQ.
-   * 
-   * @param cqName
-   */
-  /*
-   * synchronized public void removeCQFromCaching(String cqName){ if (cqToCqEventKeysMap != null) {
-   * // Take a lock on CqQuery object. In processEvents the maps are // handled under CqQuery
-   * object. if (cqToCqEventKeysMap != null){ synchronized (cqToCqEventKeysMap) {
-   * cqToCqEventKeysMap.remove(cqName); } } } }
-   */
-
-  /**
-   * Returns the CQ event cache map.
-   * 
-   * @return HashMap cqToCqEventKeysMap
-   * 
-   *         Caller must synchronize on the returned value in order to inspect.
-   */
-  /*
-   * public HashMap getCqToCqEventKeysMap(){ return cqToCqEventKeysMap; }
-   */
-
-  /**
    * Adds the query from the given CQ to the matched CQ map.
-   * 
-   * @param cq
    */
-  public void addToMatchingCqMap(CqQueryImpl cq) {
+  void addToMatchingCqMap(CqQueryImpl cq) {
     synchronized (this.matchingCqMap) {
       String cqQuery = cq.getQueryString();
-      Set<String> matchingCQs = null;
+      Set<String> matchingCQs;
       if (!matchingCqMap.containsKey(cqQuery)) {
         matchingCQs = Collections.newSetFromMap(new ConcurrentHashMap());
         matchingCqMap.put(cqQuery, matchingCQs);
@@ -1912,10 +1565,8 @@ public final class CqServiceImpl implements CqService {
 
   /**
    * Removes the query from the given CQ from the matched CQ map.
-   * 
-   * @param cq
    */
-  public void removeFromMatchingCqMap(CqQueryImpl cq) {
+  private void removeFromMatchingCqMap(CqQueryImpl cq) {
     synchronized (this.matchingCqMap) {
       String cqQuery = cq.getQueryString();
       if (matchingCqMap.containsKey(cqQuery)) {
@@ -1947,10 +1598,6 @@ public final class CqServiceImpl implements CqService {
    * Applies the query on the event. This method takes care of the performance related changed done
    * to improve the CQ-query performance. When CQ-query is executed first time, it saves the query
    * related information in the execution context and uses that info in later executions.
-   * 
-   * @param cQuery
-   * @param event
-   * @return boolean
    */
   private boolean evaluateQuery(CqQueryImpl cQuery, Object[] event) throws Exception {
     ExecutionContext execContext = cQuery.getQueryExecutionContext();
@@ -1983,19 +1630,6 @@ public final class CqServiceImpl implements CqService {
     return this.cqNameToUserAttributesMap.get(cqName);
   }
 
-  // public static void memberLeft(String poolName) {
-  // if (cqServiceSingleton != null && !cqServiceSingleton.isServer()) {
-  // cqServiceSingleton.sendMemberDisconnectedMessageToCqs(poolName);
-  // }
-  // }
-  //
-  // public static void memberCrashed(String poolName) {
-  // if (cqServiceSingleton != null && !cqServiceSingleton.isServer()) {
-  // cqServiceSingleton.sendMemberDisconnectedMessageToCqs(poolName);
-  // }
-  // }
-  //
-
   @Override
   public void cqsDisconnected(Pool pool) {
     invokeCqsConnected(pool, false);
@@ -2014,7 +1648,7 @@ public final class CqServiceImpl implements CqService {
     // Check to see if we are already connected/disconnected.
     // If state has not changed, do not invoke another connected/disconnected
     synchronized (cqPoolsConnected) {
-      // don't repeatily send same connect/disconnect message to cq's on repeated fails of
+      // don't repeatedly send same connect/disconnect message to cq's on repeated fails of
       // RedundancySatisfier
       if (cqPoolsConnected.containsKey(poolName) && connected == cqPoolsConnected.get(poolName)) {
         return;
@@ -2059,13 +1693,6 @@ public final class CqServiceImpl implements CqService {
           SystemFailure.checkFailure();
           logger.warn(LocalizedMessage
               .create(LocalizedStrings.CqService_ERROR_SENDING_CQ_CONNECTION_STATUS, cqName), t);
-
-          if (t instanceof VirtualMachineError) {
-            logger.warn(LocalizedMessage.create(
-                LocalizedStrings.CqService_VIRTUALMACHINEERROR_PROCESSING_CQLISTENER_FOR_CQ_0,
-                cqName), t);
-            return;
-          }
         }
       }
     }
@@ -2075,7 +1702,4 @@ public final class CqServiceImpl implements CqService {
   public List<String> getAllDurableCqsFromServer(InternalPool pool) {
     return new ServerCQProxyImpl(pool).getAllDurableCqsFromServer();
   }
-
-
 }
-

http://git-wip-us.apache.org/repos/asf/geode/blob/bc39e973/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
index ba71143..a675162 100644
--- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
+++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java
@@ -14,11 +14,9 @@
  */
 package org.apache.geode.cache.query.internal.cq;
 
-import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.query.CqServiceStatistics;
 import org.apache.geode.cache.query.CqQuery;
 import org.apache.geode.cache.query.internal.DefaultQueryService;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 
 /**
  * Provides statistical information about CqService.
@@ -26,24 +24,22 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
  * @since GemFire 5.5
  */
 public class CqServiceStatisticsImpl implements CqServiceStatistics {
+
   private CqServiceImpl cqService;
-  // private long activeCqs;
-  // private long stoppedCqs;
-  // private long closedCqs;
-  // private long createdCqs;
 
   /**
    * Constructor for CqStatisticsImpl
    * 
    * @param cqs - CqService
    */
-  public CqServiceStatisticsImpl(CqServiceImpl cqs) {
+  CqServiceStatisticsImpl(CqServiceImpl cqs) {
     cqService = cqs;
   }
 
   /**
    * Returns the number of CQs currently executing
    */
+  @Override
   public long numCqsActive() {
     return this.cqService.getCqServiceVsdStats().getNumCqsActive();
   }
@@ -53,6 +49,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
    * 
    * @return long number of cqs created.
    */
+  @Override
   public long numCqsCreated() {
     return this.cqService.getCqServiceVsdStats().getNumCqsCreated();
   }
@@ -60,6 +57,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
   /**
    * Returns number of Cqs that are closed.
    */
+  @Override
   public long numCqsClosed() {
     return this.cqService.getCqServiceVsdStats().getNumCqsClosed();
   }
@@ -67,6 +65,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
   /**
    * Returns number of Cqs that are stopped.
    */
+  @Override
   public long numCqsStopped() {
     return this.cqService.getCqServiceVsdStats().getNumCqsStopped();
   }
@@ -74,20 +73,18 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics {
   /**
    * Returns number of CQs created from the client.
    */
+  @Override
   public long numCqsOnClient() {
     return this.cqService.getCqServiceVsdStats().getNumCqsOnClient();
   }
 
   /**
    * Returns the number of CQs (active + suspended) on the given region.
-   * 
-   * @param regionName
    */
+  @Override
   public long numCqsOnRegion(String regionName) {
-
     DefaultQueryService queryService =
-        (DefaultQueryService) ((GemFireCacheImpl) CacheFactory.getAnyInstance())
-            .getLocalQueryService();
+        (DefaultQueryService) cqService.getInternalCache().getLocalQueryService();
     try {
       CqQuery[] cqs = queryService.getCqs(regionName);
 


Mime
View raw message