Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 003CA200C88 for ; Thu, 27 Apr 2017 20:47:41 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F3033160BBE; Thu, 27 Apr 2017 18:47:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B5794160BA7 for ; Thu, 27 Apr 2017 20:47:37 +0200 (CEST) Received: (qmail 62149 invoked by uid 500); 27 Apr 2017 18:47:36 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 62056 invoked by uid 99); 27 Apr 2017 18:47:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Apr 2017 18:47:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ADA03DF989; Thu, 27 Apr 2017 18:47:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhouxj@apache.org To: commits@geode.apache.org Date: Thu, 27 Apr 2017 18:47:39 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/40] geode git commit: GEODE-2632: refactor code to use InternalCache instead of GemFireCacheImpl archived-at: Thu, 27 Apr 2017 18:47:41 -0000 GEODE-2632: refactor code to use InternalCache instead of GemFireCacheImpl * minor cleanup also Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/363e50d2 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/363e50d2 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/363e50d2 Branch: refs/heads/feature/GEM-1299 Commit: 363e50d213d763f4cca6e0744b206941a4f2d52c Parents: 0862174 Author: Kirk Lund Authored: Wed Apr 19 14:41:42 2017 -0700 Committer: Kirk Lund Committed: Fri Apr 21 13:45:22 2017 -0700 ---------------------------------------------------------------------- .../query/internal/cq/CqServiceProvider.java | 22 +- .../query/internal/cq/spi/CqServiceFactory.java | 8 +- .../cache/query/internal/cq/ClientCQImpl.java | 95 +-- .../cache/query/internal/cq/CqQueryImpl.java | 91 ++- .../query/internal/cq/CqServiceFactoryImpl.java | 17 +- .../cache/query/internal/cq/CqServiceImpl.java | 673 ++++--------------- .../internal/cq/CqServiceStatisticsImpl.java | 21 +- .../query/internal/cq/CqServiceVsdStats.java | 73 +- .../cache/query/internal/cq/ServerCQImpl.java | 121 +--- .../cache/tier/sockets/command/ExecuteCQ.java | 4 +- .../cache/tier/sockets/command/ExecuteCQ61.java | 4 +- .../tier/sockets/command/GetDurableCQs.java | 4 +- .../cache/query/cq/dunit/CqStatsDUnitTest.java | 44 +- .../cq/dunit/CqStatsUsingPoolDUnitTest.java | 47 +- .../TopEntriesFunctionCollector.java | 22 +- .../LuceneQueryFunctionJUnitTest.java | 54 +- .../TopEntriesCollectorJUnitTest.java | 23 +- .../TopEntriesFunctionCollectorJUnitTest.java | 48 +- .../distributed/TopEntriesJUnitTest.java | 44 +- .../cache/lucene/test/LuceneTestUtilities.java | 16 + 20 files changed, 442 insertions(+), 989 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java index cded9c3..90fbf4b 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceProvider.java @@ -16,7 +16,7 @@ package org.apache.geode.cache.query.internal.cq; import org.apache.geode.cache.query.internal.cq.spi.CqServiceFactory; import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import java.io.DataInput; import java.io.IOException; @@ -26,17 +26,19 @@ import java.util.ServiceLoader; public class CqServiceProvider { private static final CqServiceFactory factory; - // System property to maintain the CQ event references for optimizing the updates. - // This will allows to run the CQ query only once during update events. + + /** + * System property to maintain the CQ event references for optimizing the updates. This will allow + * running the CQ query only once during update events. + */ public static boolean MAINTAIN_KEYS = Boolean - .valueOf(System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.MAINTAIN_KEYS", "true")) - .booleanValue(); + .valueOf(System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.MAINTAIN_KEYS", "true")); + /** * A debug flag used for testing vMotion during CQ registration */ public static boolean VMOTION_DURING_CQ_REGISTRATION_FLAG = false; - static { ServiceLoader loader = ServiceLoader.load(CqServiceFactory.class); Iterator itr = loader.iterator(); @@ -48,8 +50,7 @@ public class CqServiceProvider { } } - public static CqService create(GemFireCacheImpl cache) { - + public static CqService create(InternalCache cache) { if (factory == null) { return new MissingCqService(); } @@ -63,10 +64,7 @@ public class CqServiceProvider { } else { return factory.readCqQuery(in); } - } - private CqServiceProvider() { - - } + private CqServiceProvider() {} } http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java index 68ebbd5..2b8a47e 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java +++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/cq/spi/CqServiceFactory.java @@ -19,16 +19,16 @@ import java.io.IOException; import org.apache.geode.cache.query.internal.cq.CqService; import org.apache.geode.cache.query.internal.cq.ServerCQ; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; public interface CqServiceFactory { - public void initialize(); + void initialize(); /** * Create a new CqService for the given cache */ - public CqService create(GemFireCacheImpl cache); + CqService create(InternalCache cache); - public ServerCQ readCqQuery(DataInput in) throws ClassNotFoundException, IOException; + ServerCQ readCqQuery(DataInput in) throws ClassNotFoundException, IOException; } http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java index 00a0aa5..111bf84 100644 --- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java +++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/ClientCQImpl.java @@ -35,7 +35,7 @@ import org.apache.geode.cache.query.CqResults; import org.apache.geode.cache.query.CqStatusListener; import org.apache.geode.cache.query.RegionNotFoundException; import org.apache.geode.cache.query.internal.CqStateImpl; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; @@ -57,7 +57,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { */ private volatile ConcurrentLinkedQueue queuedEvents = null; - public final Object queuedEventsSynchObject = new Object(); + final Object queuedEventsSynchObject = new Object(); private boolean connected = false; @@ -73,22 +73,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { return this.cqName; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqQuery2#getCQProxy() - */ - public ServerCQProxyImpl getCQProxy() { + ServerCQProxyImpl getCQProxy() { return this.cqProxy; } /** * Initializes the connection using the pool from the client region. Also sets the cqBaseRegion * value of this CQ. - * - * @throws CqException */ - public void initConnectionProxy() throws CqException, RegionNotFoundException { + private void initConnectionProxy() throws CqException, RegionNotFoundException { cqBaseRegion = (LocalRegion) cqService.getCache().getRegion(regionName); // Check if the region exists on the local cache. // In the current implementation of 5.1 the Server Connection is (ConnectionProxyImpl) @@ -113,17 +106,9 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { throw new CqException( "Unable to get the connection pool. The Region does not have a pool configured."); } - - // if (proxy == null) { - // throw new - // CqException(LocalizedStrings.CqQueryImpl_UNABLE_TO_GET_THE_CONNECTIONPROXY_THE_REGION_MAY_NOT_HAVE_A_BRIDGEWRITER_OR_BRIDGECLIENT_INSTALLED_ON_IT.toLocalizedString()); - // } else if(!proxy.getEstablishCallbackConnection()){ - // throw new - // CqException(LocalizedStrings.CqQueryImpl_THE_ESTABLISHCALLBACKCONNECTION_ON_BRIDGEWRITER_CLIENT_INSTALLED_ON_REGION_0_IS_SET_TO_FALSE - // .toLocalizedString(regionName)); - // } } + @Override public void close() throws CqClosedException, CqException { this.close(true); } @@ -182,15 +167,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { if (cqProxy == null || !sendRequestToServer || isClosed) { // Stat update. if (stateBeforeClosing == CqStateImpl.RUNNING) { - cqService.stats.decCqsActive(); + cqService.stats().decCqsActive(); } else if (stateBeforeClosing == CqStateImpl.STOPPED) { - cqService.stats.decCqsStopped(); + cqService.stats().decCqsStopped(); } // Set the state to close, and update stats this.cqState.setState(CqStateImpl.CLOSED); - cqService.stats.incCqsClosed(); - cqService.stats.decCqsOnClient(); + cqService.stats().incCqsClosed(); + cqService.stats().decCqsOnClient(); if (this.stats != null) this.stats.close(); } else { @@ -201,7 +186,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { if (exception != null) { throw new CqException( LocalizedStrings.CqQueryImpl_FAILED_TO_CLOSE_THE_CQ_CQNAME_0_ERROR_FROM_LAST_ENDPOINT_1 - .toLocalizedString(new Object[] {this.cqName, exception.getLocalizedMessage()}), + .toLocalizedString(this.cqName, exception.getLocalizedMessage()), exception.getCause()); } else { throw new CqException( @@ -261,31 +246,28 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { /** * Clears the resource used by CQ. - * - * @throws CqException */ + @Override protected void cleanup() throws CqException { this.cqService.removeFromBaseRegionToCqNameMap(this.regionName, this.getServerCqName()); } + @Override public CqAttributes getCqAttributes() { return cqAttributes; } - - /** * @return Returns the cqListeners. */ public CqListener[] getCqListeners() { - return cqAttributes.getCqListeners(); } - /** * Start or resume executing the query. */ + @Override public void execute() throws CqClosedException, RegionNotFoundException, CqException { executeCqOnRedundantsAndPrimary(false); } @@ -293,7 +275,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { /** * Start or resume executing the query. Gets or updates the CQ results and returns them. */ - public CqResults executeWithInitialResults() + @Override + public CqResults executeWithInitialResults() throws CqClosedException, RegionNotFoundException, CqException { synchronized (queuedEventsSynchObject) { @@ -320,16 +303,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { CqResults initialResults; try { initialResults = (CqResults) executeCqOnRedundantsAndPrimary(true); - } catch (CqClosedException e) { - queuedEvents = null; - throw e; - } catch (RegionNotFoundException e) { - queuedEvents = null; - throw e; - } catch (CqException e) { - queuedEvents = null; - throw e; - } catch (RuntimeException e) { + } catch (RegionNotFoundException | CqException | RuntimeException e) { queuedEvents = null; throw e; } @@ -343,6 +317,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { if (!this.queuedEvents.isEmpty()) { try { Runnable r = new Runnable() { + @Override public void run() { Object[] eventArray = null; if (CqQueryImpl.testHook != null) { @@ -395,7 +370,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { * @param executeWithInitialResults boolean * @return Object SelectResults in case of executeWithInitialResults */ - public Object executeCqOnRedundantsAndPrimary(boolean executeWithInitialResults) + private Object executeCqOnRedundantsAndPrimary(boolean executeWithInitialResults) throws CqClosedException, RegionNotFoundException, CqException { Object initialResults = null; @@ -461,8 +436,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { } else { String errMsg = LocalizedStrings.CqQueryImpl_FAILED_TO_EXECUTE_THE_CQ_CQNAME_0_QUERY_STRING_IS_1_ERROR_FROM_LAST_SERVER_2 - .toLocalizedString( - new Object[] {this.cqName, this.queryString, ex.getLocalizedMessage()}); + .toLocalizedString(this.cqName, this.queryString, ex.getLocalizedMessage()); if (logger.isDebugEnabled()) { logger.debug(errMsg, ex); } @@ -498,8 +472,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { } } // Update CQ-base region for book keeping. - this.cqService.stats.incCqsActive(); - this.cqService.stats.decCqsStopped(); + this.cqService.stats().incCqsActive(); + this.cqService.stats().decCqsStopped(); return initialResults; } @@ -509,23 +483,22 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { * @return true if shutdown in progress else false. */ private boolean shutdownInProgress() { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = cqService.getInternalCache(); if (cache == null || cache.isClosed()) { return true; // bail, things are shutting down } - String reason = cqProxy.getPool().getCancelCriterion().cancelInProgress(); if (reason != null) { return true; } return false; - } /** * Stop or pause executing the query. */ + @Override public void stop() throws CqClosedException, CqException { boolean isStopped = false; synchronized (this.cqState) { @@ -558,8 +531,8 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { if (cqProxy == null || isStopped) { // Change state and stats on the client side this.cqState.setState(CqStateImpl.STOPPED); - this.cqService.stats.incCqsStopped(); - this.cqService.stats.decCqsActive(); + this.cqService.stats().incCqsStopped(); + this.cqService.stats().decCqsActive(); if (logger.isDebugEnabled()) { logger.debug("Successfully stopped the CQ. {}", cqName); } @@ -568,7 +541,7 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { if (exception != null) { throw new CqException( LocalizedStrings.CqQueryImpl_FAILED_TO_STOP_THE_CQ_CQNAME_0_ERROR_FROM_LAST_SERVER_1 - .toLocalizedString(new Object[] {this.cqName, exception.getLocalizedMessage()}), + .toLocalizedString(this.cqName, exception.getLocalizedMessage()), exception.getCause()); } else { throw new CqException( @@ -579,24 +552,15 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { } } + @Override public CqAttributesMutator getCqAttributesMutator() { return (CqAttributesMutator) this.cqAttributes; } - - public ConcurrentLinkedQueue getQueuedEvents() { + ConcurrentLinkedQueue getQueuedEvents() { return this.queuedEvents; } - - - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.cache.query.internal.InternalCqQuery2#setProxyCache(org.apache.geode.cache. - * client.internal.ProxyCache) - */ @Override public void setProxyCache(ProxyCache proxyCache) { this.proxyCache = proxyCache; @@ -612,7 +576,6 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { @Override public void createOn(Connection conn, boolean isDurable) { - byte regionDataPolicyOrdinal = getCqBaseRegion() == null ? (byte) 0 : getCqBaseRegion().getAttributes().getDataPolicy().ordinal; @@ -620,6 +583,4 @@ public class ClientCQImpl extends CqQueryImpl implements ClientCQ { this.cqProxy.createOn(getName(), conn, getQueryString(), state, isDurable, regionDataPolicyOrdinal); } - - } http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java index 22b4137..07e3171 100644 --- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java +++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqQueryImpl.java @@ -21,11 +21,9 @@ import java.util.Set; import org.apache.logging.log4j.Logger; import org.apache.geode.StatisticsFactory; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.query.CqClosedException; import org.apache.geode.cache.query.CqEvent; import org.apache.geode.cache.query.CqException; -import org.apache.geode.cache.query.CqExistsException; import org.apache.geode.cache.query.CqState; import org.apache.geode.cache.query.CqStatistics; import org.apache.geode.cache.query.Query; @@ -38,7 +36,7 @@ import org.apache.geode.cache.query.internal.CqStateImpl; import org.apache.geode.cache.query.internal.DefaultQuery; import org.apache.geode.cache.query.internal.ExecutionContext; import org.apache.geode.cache.query.internal.QueryExecutionContext; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.InternalLogWriter; @@ -58,13 +56,13 @@ public abstract class CqQueryImpl implements InternalCqQuery { protected String queryString; - protected static final Object TOKEN = new Object(); + static final Object TOKEN = new Object(); - protected LocalRegion cqBaseRegion; + LocalRegion cqBaseRegion; protected Query query = null; - protected InternalLogWriter securityLogWriter; + InternalLogWriter securityLogWriter; protected CqServiceImpl cqService; @@ -72,14 +70,14 @@ public abstract class CqQueryImpl implements InternalCqQuery { protected boolean isDurable = false; - // Stats counters - protected CqStatisticsImpl cqStats; + /** Stats counters */ + private CqStatisticsImpl cqStats; protected CqQueryVsdStats stats; protected final CqStateImpl cqState = new CqStateImpl(); - protected ExecutionContext queryExecutionContext = null; + private ExecutionContext queryExecutionContext = null; public static TestHook testHook = null; @@ -100,6 +98,7 @@ public abstract class CqQueryImpl implements InternalCqQuery { /** * returns CQ name */ + @Override public String getName() { return this.cqName; } @@ -109,6 +108,7 @@ public abstract class CqQueryImpl implements InternalCqQuery { this.cqName = cqName; } + @Override public void setCqService(CqService cqService) { this.cqService = (CqServiceImpl) cqService; } @@ -121,25 +121,24 @@ public abstract class CqQueryImpl implements InternalCqQuery { return this.regionName; } - public void updateCqCreateStats() { + void updateCqCreateStats() { // Initialize the VSD statistics StatisticsFactory factory = cqService.getCache().getDistributedSystem(); this.stats = new CqQueryVsdStats(factory, getServerCqName()); this.cqStats = new CqStatisticsImpl(this); // Update statistics with CQ creation. - this.cqService.stats.incCqsStopped(); - this.cqService.stats.incCqsCreated(); - this.cqService.stats.incCqsOnClient(); + this.cqService.stats().incCqsStopped(); + this.cqService.stats().incCqsCreated(); + this.cqService.stats().incCqsOnClient(); } /** * Validates the CQ. Checks for cq constraints. Also sets the base region name. */ - public void validateCq() { - Cache cache = cqService.getCache(); - DefaultQuery locQuery = - (DefaultQuery) ((GemFireCacheImpl) cache).getLocalQueryService().newQuery(this.queryString); + void validateCq() { + InternalCache cache = cqService.getInternalCache(); + DefaultQuery locQuery = (DefaultQuery) cache.getLocalQueryService().newQuery(this.queryString); this.query = locQuery; // assert locQuery != null; @@ -221,10 +220,8 @@ public abstract class CqQueryImpl implements InternalCqQuery { /** * Removes the CQ from CQ repository. - * - * @throws CqException */ - protected void removeFromCqMap() throws CqException { + void removeFromCqMap() throws CqException { try { cqService.removeCq(this.getServerCqName()); } catch (Exception ex) { @@ -243,6 +240,7 @@ public abstract class CqQueryImpl implements InternalCqQuery { /** * Returns the QueryString of this CQ. */ + @Override public String getQueryString() { return queryString; } @@ -252,23 +250,16 @@ public abstract class CqQueryImpl implements InternalCqQuery { * * @return the Query for the query string */ + @Override public Query getQuery() { return query; } - - /** - * @see org.apache.geode.cache.query.CqQuery#getStatistics() - */ + @Override public CqStatistics getStatistics() { return cqStats; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqQuery2#getCqBaseRegion() - */ @Override public LocalRegion getCqBaseRegion() { return this.cqBaseRegion; @@ -279,11 +270,12 @@ public abstract class CqQueryImpl implements InternalCqQuery { /** * @return Returns the Region name on which this cq is created. */ - public String getBaseRegionName() { + String getBaseRegionName() { return this.regionName; } + @Override public abstract String getServerCqName(); /** @@ -291,15 +283,11 @@ public abstract class CqQueryImpl implements InternalCqQuery { * * @return STOPPED RUNNING or CLOSED */ + @Override public CqState getState() { return this.cqState; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqQuery2#setCqState(int) - */ @Override public void setCqState(int state) { if (this.isClosed()) { @@ -309,18 +297,13 @@ public abstract class CqQueryImpl implements InternalCqQuery { synchronized (cqState) { if (state == CqStateImpl.RUNNING) { - if (this.isRunning()) { - // throw new - // IllegalStateException(LocalizedStrings.CqQueryImpl_CQ_IS_NOT_IN_RUNNING_STATE_STOP_CQ_DOES_NOT_APPLY_CQNAME_0 - // .toLocalizedString(this.cqName)); - } this.cqState.setState(CqStateImpl.RUNNING); - this.cqService.stats.incCqsActive(); - this.cqService.stats.decCqsStopped(); + this.cqService.stats().incCqsActive(); + this.cqService.stats().decCqsStopped(); } else if (state == CqStateImpl.STOPPED) { this.cqState.setState(CqStateImpl.STOPPED); - this.cqService.stats.incCqsStopped(); - this.cqService.stats.decCqsActive(); + this.cqService.stats().incCqsStopped(); + this.cqService.stats().decCqsActive(); } else if (state == CqStateImpl.CLOSING) { this.cqState.setState(state); } @@ -332,7 +315,7 @@ public abstract class CqQueryImpl implements InternalCqQuery { * * @param cqEvent object */ - public void updateStats(CqEvent cqEvent) { + void updateStats(CqEvent cqEvent) { this.stats.updateStats(cqEvent); // Stats for VSD } @@ -341,15 +324,17 @@ public abstract class CqQueryImpl implements InternalCqQuery { * * @return true if running, false otherwise */ + @Override public boolean isRunning() { return this.cqState.isRunning(); } /** - * Return true if the CQ is in Sstopped state + * Return true if the CQ is in stopped state * * @return true if stopped, false otherwise */ + @Override public boolean isStopped() { return this.cqState.isStopped(); } @@ -359,6 +344,7 @@ public abstract class CqQueryImpl implements InternalCqQuery { * * @return true if closed, false otherwise */ + @Override public boolean isClosed() { return this.cqState.isClosed(); } @@ -377,6 +363,7 @@ public abstract class CqQueryImpl implements InternalCqQuery { * * @return true if durable, false otherwise */ + @Override public boolean isDurable() { return this.isDurable; } @@ -391,22 +378,22 @@ public abstract class CqQueryImpl implements InternalCqQuery { return stats; } - public ExecutionContext getQueryExecutionContext() { + ExecutionContext getQueryExecutionContext() { return queryExecutionContext; } - public void setQueryExecutionContext(ExecutionContext queryExecutionContext) { + private void setQueryExecutionContext(ExecutionContext queryExecutionContext) { this.queryExecutionContext = queryExecutionContext; } /** Test Hook */ public interface TestHook { - public void pauseUntilReady(); + void pauseUntilReady(); - public void ready(); + void ready(); - public int numQueuedEvents(); + int numQueuedEvents(); - public void setEventCount(int count); + void setEventCount(int count); } } http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java index db90632..9cc2eea 100644 --- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java +++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceFactoryImpl.java @@ -22,7 +22,7 @@ import java.util.Map; import org.apache.geode.cache.query.internal.cq.spi.CqServiceFactory; import org.apache.geode.internal.Version; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.CommandInitializer; @@ -36,14 +36,13 @@ import org.apache.geode.internal.cache.tier.sockets.command.StopCQ; public class CqServiceFactoryImpl implements CqServiceFactory { + @Override public void initialize() { - { - Map versions = new HashMap(); - versions.put(Version.GFE_57, ExecuteCQ.getCommand()); - versions.put(Version.GFE_61, ExecuteCQ61.getCommand()); - CommandInitializer.registerCommand(MessageType.EXECUTECQ_MSG_TYPE, versions); - CommandInitializer.registerCommand(MessageType.EXECUTECQ_WITH_IR_MSG_TYPE, versions); - } + Map versions = new HashMap<>(); + versions.put(Version.GFE_57, ExecuteCQ.getCommand()); + versions.put(Version.GFE_61, ExecuteCQ61.getCommand()); + CommandInitializer.registerCommand(MessageType.EXECUTECQ_MSG_TYPE, versions); + CommandInitializer.registerCommand(MessageType.EXECUTECQ_WITH_IR_MSG_TYPE, versions); CommandInitializer.registerCommand(MessageType.GETCQSTATS_MSG_TYPE, Collections.singletonMap(Version.GFE_57, GetCQStats.getCommand())); @@ -58,7 +57,7 @@ public class CqServiceFactoryImpl implements CqServiceFactory { } @Override - public CqService create(GemFireCacheImpl cache) { + public CqService create(InternalCache cache) { return new CqServiceImpl(cache); } http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java index f1ca832..570c06c 100644 --- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java +++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceImpl.java @@ -14,19 +14,63 @@ */ package org.apache.geode.cache.query.internal.cq; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.InvalidDeltaException; import org.apache.geode.StatisticsFactory; import org.apache.geode.SystemFailure; -import org.apache.geode.cache.*; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheEvent; +import org.apache.geode.cache.CacheLoaderException; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.TimeoutException; import org.apache.geode.cache.client.Pool; -import org.apache.geode.cache.client.internal.*; -import org.apache.geode.cache.query.*; -import org.apache.geode.cache.query.internal.*; +import org.apache.geode.cache.client.internal.GetEventValueOp; +import org.apache.geode.cache.client.internal.InternalPool; +import org.apache.geode.cache.client.internal.QueueManager; +import org.apache.geode.cache.client.internal.ServerCQProxyImpl; +import org.apache.geode.cache.client.internal.UserAttributes; +import org.apache.geode.cache.query.CqAttributes; +import org.apache.geode.cache.query.CqClosedException; +import org.apache.geode.cache.query.CqException; +import org.apache.geode.cache.query.CqExistsException; +import org.apache.geode.cache.query.CqListener; +import org.apache.geode.cache.query.CqQuery; +import org.apache.geode.cache.query.CqServiceStatistics; +import org.apache.geode.cache.query.CqStatusListener; +import org.apache.geode.cache.query.QueryException; +import org.apache.geode.cache.query.QueryInvalidException; +import org.apache.geode.cache.query.RegionNotFoundException; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.internal.CompiledSelect; +import org.apache.geode.cache.query.internal.CqQueryVsdStats; +import org.apache.geode.cache.query.internal.CqStateImpl; +import org.apache.geode.cache.query.internal.DefaultQuery; +import org.apache.geode.cache.query.internal.ExecutionContext; import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.i18n.StringId; import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; -import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.FilterProfile; +import org.apache.geode.internal.cache.FilterRoutingInfo; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy; @@ -35,57 +79,43 @@ import org.apache.geode.internal.cache.tier.sockets.Part; import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; -import org.apache.logging.log4j.Logger; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; /** - * @since GemFire 5.5 - * - * Implements the CqService functionality. - * - */ -/** + * Implements the CqService functionality. * + * @since GemFire 5.5 */ public final class CqServiceImpl implements CqService { private static final Logger logger = LogService.getLogger(); - private static final Integer MESSAGE_TYPE_LOCAL_CREATE = - Integer.valueOf(MessageType.LOCAL_CREATE); - private static final Integer MESSAGE_TYPE_LOCAL_UPDATE = - Integer.valueOf(MessageType.LOCAL_UPDATE); - private static final Integer MESSAGE_TYPE_LOCAL_DESTROY = - Integer.valueOf(MessageType.LOCAL_DESTROY); - private static final Integer MESSAGE_TYPE_EXCEPTION = Integer.valueOf(MessageType.EXCEPTION); + private static final Integer MESSAGE_TYPE_LOCAL_CREATE = MessageType.LOCAL_CREATE; + private static final Integer MESSAGE_TYPE_LOCAL_UPDATE = MessageType.LOCAL_UPDATE; + private static final Integer MESSAGE_TYPE_LOCAL_DESTROY = MessageType.LOCAL_DESTROY; + private static final Integer MESSAGE_TYPE_EXCEPTION = MessageType.EXCEPTION; /** * System property to evaluate the query even though the initial results are not required when cq * is executed using the execute() method. */ - public static boolean EXECUTE_QUERY_DURING_INIT = Boolean - .valueOf(System - .getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.EXECUTE_QUERY_DURING_INIT", "true")) - .booleanValue(); + public static boolean EXECUTE_QUERY_DURING_INIT = Boolean.valueOf(System + .getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.EXECUTE_QUERY_DURING_INIT", "true")); private static final String CQ_NAME_PREFIX = "GfCq"; - private final Cache cache; + private final InternalCache cache; /** * Manages cq pools to determine if a status of connect or disconnect needs to be sent out */ - private final HashMap cqPoolsConnected = new HashMap(); - + private final HashMap cqPoolsConnected = new HashMap<>(); /** * Manages CQ objects. uses serverCqName as key and CqQueryImpl as value * - * @guarded.By cqQueryMapLock + * GuardedBy cqQueryMapLock */ - private volatile HashMap cqQueryMap = new HashMap(); + private volatile HashMap cqQueryMap = new HashMap<>(); + private final Object cqQueryMapLock = new Object(); private volatile boolean isRunning = false; @@ -93,36 +123,21 @@ public final class CqServiceImpl implements CqService { /** * Used by client when multiuser-authentication is true. */ - private final HashMap cqNameToUserAttributesMap = - new HashMap(); - - // private boolean isServer = true; - - /* - * // Map to manage CQ to satisfied CQ events (keys) for optimizing updates. private final HashMap - * cqToCqEventKeysMap = CqService.MAINTAIN_KEYS ? new HashMap() : null; - */ + private final HashMap cqNameToUserAttributesMap = new HashMap<>(); // Map to manage the similar CQs (having same query - performance optimization). // With query as key and Set of CQs as values. private final ConcurrentHashMap matchingCqMap; // CQ Service statistics - public final CqServiceStatisticsImpl cqServiceStats; - public final CqServiceVsdStats stats; + private final CqServiceStatisticsImpl cqServiceStats; + private final CqServiceVsdStats stats; // CQ identifier, also used in auto generated CQ names private volatile long cqId = 1; - /** - * Used to synchronize access to CQs in the repository - */ - final Object cqSync = new Object(); - /* This is to manage region to CQs map, client side book keeping. */ - private HashMap> baseRegionToCqNameMap = - new HashMap>(); - + private HashMap> baseRegionToCqNameMap = new HashMap<>(); /** * Access and modification to the contents of this map do not necessarily need to be lock @@ -135,33 +150,24 @@ public final class CqServiceImpl implements CqService { /** * Constructor. - * - * @param c The cache used for the service + * + * @param cache The cache used for the service */ - public CqServiceImpl(final Cache c) { - if (c == null) { + public CqServiceImpl(final InternalCache cache) { + if (cache == null) { throw new IllegalStateException(LocalizedStrings.CqService_CACHE_IS_NULL.toLocalizedString()); } - GemFireCacheImpl gfc = (GemFireCacheImpl) c; - gfc.getCancelCriterion().checkCancelInProgress(null); - - this.cache = gfc; + cache.getCancelCriterion().checkCancelInProgress(null); + this.cache = cache; // Initialize the Map which maintains the matching cqs. this.matchingCqMap = new ConcurrentHashMap>(); // Initialize the VSD statistics - StatisticsFactory factory = cache.getDistributedSystem(); + StatisticsFactory factory = this.cache.getDistributedSystem(); this.stats = new CqServiceVsdStats(factory); this.cqServiceStats = new CqServiceStatisticsImpl(this); - - // final LoggingThreadGroup group = - // LoggingThreadGroup.createThreadGroup("CqExecutor Threads", logger); - - // if (this.cache.getCacheServers().isEmpty()) { - // isServer = false; - // } } /** @@ -171,13 +177,14 @@ public final class CqServiceImpl implements CqService { return this.cache; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#newCq(java.lang.String, - * java.lang.String, org.apache.geode.cache.query.CqAttributes, - * org.apache.geode.cache.client.internal.ServerCQProxy, boolean) - */ + public InternalCache getInternalCache() { + return this.cache; + } + + public CqServiceVsdStats stats() { + return this.stats; + } + @Override public synchronized ClientCQ newCq(String cqName, String queryString, CqAttributes cqAttributes, InternalPool pool, boolean isDurable) @@ -242,22 +249,15 @@ public final class CqServiceImpl implements CqService { return cQuery; } - /** * Executes the given CqQuery, if the CqQuery for that name is not there it registers the one and * executes. This is called on the Server. * - * @param cqName - * @param queryString - * @param cqState - * @param clientProxyId - * @param ccn * @param manageEmptyRegions whether to update the 6.1 emptyRegions map held in the CCN * @param regionDataPolicy the data policy of the region associated with the query. This is only * needed if manageEmptyRegions is true. * @param emptyRegionsMap map of empty regions. * @throws IllegalStateException if this is called at client side. - * @throws CqException */ @Override public synchronized ServerCQ executeCq(String cqName, String queryString, int cqState, @@ -271,7 +271,7 @@ public final class CqServiceImpl implements CqService { } String serverCqName = constructServerCqName(cqName, clientProxyId); - ServerCQImpl cQuery = null; + ServerCQImpl cQuery; // If this CQ is not yet registered in Server, register CQ. if (!isCqExists(serverCqName)) { @@ -292,7 +292,6 @@ public final class CqServiceImpl implements CqService { logger.info(LocalizedMessage.create( LocalizedStrings.CqService_EXCEPTION_WHILE_REGISTERING_CQ_ON_SERVER_CQNAME___0, cQuery.getName())); - cQuery = null; throw cqe; } @@ -308,6 +307,7 @@ public final class CqServiceImpl implements CqService { return cQuery; } + @Override public void resumeCQ(int cqState, ServerCQ cQuery) { // Initialize the state of CQ. if (((CqStateImpl) cQuery.getState()).getState() != cqState) { @@ -324,25 +324,10 @@ public final class CqServiceImpl implements CqService { } } - /* - * public void addToCqEventKeysMap(CqQuery cq){ if (cqToCqEventKeysMap != null) { synchronized - * (cqToCqEventKeysMap){ String serverCqName = ((CqQueryImpl)cq).getServerCqName(); if - * (!cqToCqEventKeysMap.containsKey(serverCqName)){ cqToCqEventKeysMap.put(serverCqName, new - * HashSet()); if (_logger.isDebugEnabled()) { - * _logger.debug("CQ Event key maintenance for CQ, CqName: " + serverCqName + " is Enabled." + - * " key maintenance map size is: " + cqToCqEventKeysMap.size()); } } } // synchronized } } - */ - - public boolean hasCq() { - HashMap cqMap = cqQueryMap; - return (cqMap.size() > 0); - } - - /** * Adds the given CQ and cqQuery object into the CQ map. */ - public void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException { + void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException { // On server side cqName will be server side cqName. String sCqName = cq.getServerCqName(); if (logger.isDebugEnabled()) { @@ -355,7 +340,7 @@ public final class CqServiceImpl implements CqService { .toLocalizedString(sCqName)); } synchronized (cqQueryMapLock) { - HashMap tmpCqQueryMap = new HashMap(cqQueryMap); + HashMap tmpCqQueryMap = new HashMap<>(cqQueryMap); try { tmpCqQueryMap.put(sCqName, cq); } catch (Exception ex) { @@ -377,66 +362,34 @@ public final class CqServiceImpl implements CqService { /** * Removes given CQ from the cqMap.. */ - public void removeCq(String cqName) { + void removeCq(String cqName) { // On server side cqName will be server side cqName. synchronized (cqQueryMapLock) { - HashMap tmpCqQueryMap = new HashMap(cqQueryMap); + HashMap tmpCqQueryMap = new HashMap<>(cqQueryMap); tmpCqQueryMap.remove(cqName); this.cqNameToUserAttributesMap.remove(cqName); cqQueryMap = tmpCqQueryMap; } } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.cache.query.internal.InternalCqService#getClientCqFromServer(org.apache.geode. - * internal.cache.tier.sockets.ClientProxyMembershipID, java.lang.String) - */ @Override public CqQuery getClientCqFromServer(ClientProxyMembershipID clientProxyId, String clientCqName) { // On server side cqName will be server side cqName. HashMap cqMap = cqQueryMap; - return (CqQuery) cqMap.get(this.constructServerCqName(clientCqName, clientProxyId)); + return cqMap.get(this.constructServerCqName(clientCqName, clientProxyId)); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#getCq(java.lang.String) - */ @Override public InternalCqQuery getCq(String cqName) { // On server side cqName will be server side cqName. - return (InternalCqQuery) cqQueryMap.get(cqName); + return cqQueryMap.get(cqName); } - /** - * Clears the CQ Query Map. - */ - public void clearCqQueryMap() { - // On server side cqName will be server side cqName. - synchronized (cqQueryMapLock) { - cqQueryMap = new HashMap(); - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#getAllCqs() - */ @Override public Collection getAllCqs() { return cqQueryMap.values(); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#getAllCqs(java.lang.String) - */ @Override public Collection getAllCqs(final String regionName) throws CqException { @@ -445,7 +398,7 @@ public final class CqServiceImpl implements CqService { LocalizedStrings.CqService_NULL_ARGUMENT_0.toLocalizedString("regionName")); } - String[] cqNames = null; + String[] cqNames; synchronized (this.baseRegionToCqNameMap) { ArrayList cqs = this.baseRegionToCqNameMap.get(regionName); @@ -456,7 +409,7 @@ public final class CqServiceImpl implements CqService { cqs.toArray(cqNames); } - ArrayList cQueryList = new ArrayList(); + ArrayList cQueryList = new ArrayList<>(); for (int cqCnt = 0; cqCnt < cqNames.length; cqCnt++) { InternalCqQuery cq = getCq(cqNames[cqCnt]); if (cq != null) { @@ -467,34 +420,16 @@ public final class CqServiceImpl implements CqService { return cQueryList; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#executeAllClientCqs() - */ @Override public synchronized void executeAllClientCqs() throws CqException { executeCqs(this.getAllCqs()); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.cache.query.internal.InternalCqService#executeAllRegionCqs(java.lang.String) - */ @Override public synchronized void executeAllRegionCqs(final String regionName) throws CqException { executeCqs(getAllCqs(regionName)); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.cache.query.internal.InternalCqService#executeCqs(org.apache.geode.cache.query - * .CqQuery[]) - */ @Override public synchronized void executeCqs(Collection cqs) throws CqException { @@ -503,53 +438,31 @@ public final class CqServiceImpl implements CqService { } String cqName = null; for (InternalCqQuery internalCq : cqs) { - CqQuery cq = (CqQuery) internalCq; + CqQuery cq = internalCq; if (!cq.isClosed() && cq.isStopped()) { try { cqName = cq.getName(); cq.execute(); - } catch (QueryException qe) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName, - qe.getMessage()); - } - } catch (CqClosedException cce) { + } catch (QueryException | CqClosedException e) { if (logger.isDebugEnabled()) { logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName, - cce.getMessage()); + e.getMessage()); } } } } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#stopAllClientCqs() - */ @Override public synchronized void stopAllClientCqs() throws CqException { stopCqs(this.getAllCqs()); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#stopAllRegionCqs(java.lang.String) - */ @Override public synchronized void stopAllRegionCqs(final String regionName) throws CqException { stopCqs(this.getAllCqs(regionName)); } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.cache.query.internal.InternalCqService#stopCqs(org.apache.geode.cache.query. - * CqQuery[]) - */ @Override public synchronized void stopCqs(Collection cqs) throws CqException { final boolean isDebugEnabled = logger.isDebugEnabled(); @@ -567,29 +480,20 @@ public final class CqServiceImpl implements CqService { String cqName = null; for (InternalCqQuery internalCqQuery : cqs) { - CqQuery cq = (CqQuery) internalCqQuery; + CqQuery cq = internalCqQuery; if (!cq.isClosed() && cq.isRunning()) { try { cqName = cq.getName(); cq.stop(); - } catch (QueryException qe) { - if (isDebugEnabled) { - logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, qe.getMessage()); - } - } catch (CqClosedException cce) { + } catch (QueryException | CqClosedException e) { if (isDebugEnabled) { - logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, cce.getMessage()); + logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, e.getMessage()); } } } } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#closeCqs(java.lang.String) - */ @Override public void closeCqs(final String regionName) throws CqException { Collection cqs = this.getAllCqs(regionName); @@ -603,8 +507,8 @@ public final class CqServiceImpl implements CqService { // invoked on the server cq.close(false); } else { - // @todo grid: if regionName has a pool check its keepAlive - boolean keepAlive = ((GemFireCacheImpl) this.cache).keepDurableSubscriptionsAlive(); + // TODO: grid: if regionName has a pool check its keepAlive + boolean keepAlive = this.cache.keepDurableSubscriptionsAlive(); if (cq.isDurable() && keepAlive) { logger.warn(LocalizedMessage.create( LocalizedStrings.CqService_NOT_SENDING_CQ_CLOSE_TO_THE_SERVER_AS_IT_IS_A_DURABLE_CQ)); @@ -614,14 +518,9 @@ public final class CqServiceImpl implements CqService { } } - } catch (QueryException qe) { + } catch (QueryException | CqClosedException e) { if (logger.isDebugEnabled()) { - logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, qe.getMessage()); - } - } catch (CqClosedException cce) { - if (logger.isDebugEnabled()) { - logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, - cce.getMessage()); + logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, e.getMessage()); } } } @@ -630,10 +529,6 @@ public final class CqServiceImpl implements CqService { /** * Called directly on server side. - * - * @param cqName - * @param clientId - * @throws CqException */ @Override public void stopCq(String cqName, ClientProxyMembershipID clientId) throws CqException { @@ -650,8 +545,6 @@ public final class CqServiceImpl implements CqService { try { HashMap cqMap = cqQueryMap; if (!cqMap.containsKey(serverCqName)) { - // throw new - // CqException(LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_STOP_THE_SPECIFIED_CQ_0.toLocalizedString(serverCqName)); /* * gregp 052808: We should silently fail here instead of throwing error. This is to deal * with races in recovery @@ -689,15 +582,8 @@ public final class CqServiceImpl implements CqService { } // Send stop message to peers. cQuery.getCqBaseRegion().getFilterProfile().stopCq(cQuery); - } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#closeCq(java.lang.String, - * org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID) - */ @Override public void closeCq(String cqName, ClientProxyMembershipID clientProxyId) throws CqException { String serverCqName = cqName; @@ -713,9 +599,6 @@ public final class CqServiceImpl implements CqService { try { HashMap cqMap = cqQueryMap; if (!cqMap.containsKey(serverCqName)) { - // throw new - // CqException(LocalizedStrings.CqService_CQ_NOT_FOUND_FAILED_TO_CLOSE_THE_SPECIFIED_CQ_0 - // .toLocalizedString(serverCqName)); /* * gregp 052808: We should silently fail here instead of throwing error. This is to deal * with races in recovery @@ -791,12 +674,6 @@ public final class CqServiceImpl implements CqService { } } - - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#closeAllCqs(boolean) - */ @Override public void closeAllCqs(boolean clientInitiated) { closeAllCqs(clientInitiated, getAllCqs()); @@ -807,21 +684,13 @@ public final class CqServiceImpl implements CqService { * CqQuerys created by other VMs are unaffected. */ private void closeAllCqs(boolean clientInitiated, Collection cqs) { - closeAllCqs(clientInitiated, cqs, - ((GemFireCacheImpl) this.cache).keepDurableSubscriptionsAlive()); + closeAllCqs(clientInitiated, cqs, this.cache.keepDurableSubscriptionsAlive()); } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#closeAllCqs(boolean, - * org.apache.geode.cache.query.CqQuery[], boolean) - */ @Override public void closeAllCqs(boolean clientInitiated, Collection cqs, boolean keepAlive) { - // CqQuery[] cqs = getAllCqs(); if (cqs != null) { String cqName = null; if (logger.isDebugEnabled()) { @@ -830,7 +699,6 @@ public final class CqServiceImpl implements CqService { for (InternalCqQuery cQuery : cqs) { try { cqName = cQuery.getName(); - // boolean keepAlive = ((GemFireCache)this.cache).keepDurableSubscriptionsAlive(); if (isServer()) { cQuery.close(false); @@ -847,47 +715,26 @@ public final class CqServiceImpl implements CqService { } } } - } catch (QueryException cqe) { + } catch (QueryException | CqClosedException e) { if (!isRunning()) { // Not cache shutdown logger .warn(LocalizedMessage.create(LocalizedStrings.CqService_FAILED_TO_CLOSE_CQ__0___1, - new Object[] {cqName, cqe.getMessage()})); + new Object[] {cqName, e.getMessage()})); } if (logger.isDebugEnabled()) { - logger.debug(cqe.getMessage(), cqe); - } - } catch (CqClosedException cqe) { - if (!isRunning()) { - // Not cache shutdown - logger - .warn(LocalizedMessage.create(LocalizedStrings.CqService_FAILED_TO_CLOSE_CQ__0___1, - new Object[] {cqName, cqe.getMessage()})); - } - if (logger.isDebugEnabled()) { - logger.debug(cqe.getMessage(), cqe); + logger.debug(e.getMessage(), e); } } } } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#getCqStatistics() - */ @Override public CqServiceStatistics getCqStatistics() { return cqServiceStats; } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#closeClientCqs(org.apache.geode. - * internal.cache.tier.sockets.ClientProxyMembershipID) - */ @Override public void closeClientCqs(ClientProxyMembershipID clientProxyId) throws CqException { final boolean isDebugEnabled = logger.isDebugEnabled(); @@ -899,30 +746,19 @@ public final class CqServiceImpl implements CqService { CqQueryImpl cQuery = (CqQueryImpl) cq; try { cQuery.close(false); - } catch (QueryException qe) { + } catch (QueryException | CqClosedException e) { if (isDebugEnabled) { logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(), - qe.getMessage()); - } - } catch (CqClosedException cce) { - if (isDebugEnabled) { - logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(), - cce.getMessage()); + e.getMessage()); } } } } - /* - * (non-Javadoc) - * - * @see org.apache.geode.cache.query.internal.InternalCqService#getAllClientCqs(org.apache.geode. - * internal.cache.tier.sockets.ClientProxyMembershipID) - */ @Override public List getAllClientCqs(ClientProxyMembershipID clientProxyId) { Collection cqs = getAllCqs(); - ArrayList clientCqs = new ArrayList(); + ArrayList clientCqs = new ArrayList<>(); for (InternalCqQuery cq : cqs) { ServerCQImpl cQuery = (ServerCQImpl) cq; @@ -934,23 +770,16 @@ public final class CqServiceImpl implements CqService { return clientCqs; } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.cache.query.internal.InternalCqService#getAllDurableClientCqs(org.apache.geode - * .internal.cache.tier.sockets.ClientProxyMembershipID) - */ @Override public List getAllDurableClientCqs(ClientProxyMembershipID clientProxyId) throws CqException { if (clientProxyId == null) { throw new CqException( LocalizedStrings.CqService_UNABLE_TO_RETRIEVE_DURABLE_CQS_FOR_CLIENT_PROXY_ID - .toLocalizedString(clientProxyId)); + .toLocalizedString(null)); } List cqs = getAllClientCqs(clientProxyId); - ArrayList durableClientCqs = new ArrayList(); + ArrayList durableClientCqs = new ArrayList<>(); for (ServerCQ cq : cqs) { ServerCQImpl cQuery = (ServerCQImpl) cq; @@ -966,9 +795,6 @@ public final class CqServiceImpl implements CqService { /** * Server side method. Closes non-durable CQs for the given client proxy id. - * - * @param clientProxyId - * @throws CqException */ @Override public void closeNonDurableClientCqs(ClientProxyMembershipID clientProxyId) throws CqException { @@ -983,15 +809,10 @@ public final class CqServiceImpl implements CqService { if (!cQuery.isDurable()) { cQuery.close(false); } - } catch (QueryException qe) { - if (isDebugEnabled) { - logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(), - qe.getMessage()); - } - } catch (CqClosedException cce) { + } catch (QueryException | CqClosedException e) { if (isDebugEnabled) { logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(), - cce.getMessage()); + e.getMessage()); } } } @@ -1028,6 +849,7 @@ public final class CqServiceImpl implements CqService { return this.isRunning; } + @Override public void start() { this.isRunning = true; } @@ -1035,9 +857,10 @@ public final class CqServiceImpl implements CqService { /** * @return Returns the serverCqName. */ + @Override public String constructServerCqName(String cqName, ClientProxyMembershipID clientProxyId) { - ConcurrentHashMap cache = serverCqNameCache - .computeIfAbsent(cqName, key -> new ConcurrentHashMap()); + ConcurrentHashMap cache = + serverCqNameCache.computeIfAbsent(cqName, key -> new ConcurrentHashMap<>()); String cName = cache.get(clientProxyId); if (null == cName) { @@ -1065,7 +888,7 @@ public final class CqServiceImpl implements CqService { } } - /* + /** * Checks if CQ with the given name already exists. * * @param cqName name of the CQ. @@ -1073,17 +896,15 @@ public final class CqServiceImpl implements CqService { * @return true if exists else false. */ private synchronized boolean isCqExists(String cqName) { - boolean status = false; HashMap cqMap = cqQueryMap; - status = cqMap.containsKey(cqName); - return status; + return cqMap.containsKey(cqName); } - /* + /** * Generates a name for CQ. Checks if CQ with that name already exists if so generates a new * cqName. */ - public synchronized String generateCqName() { + private synchronized String generateCqName() { while (true) { String cqName = CQ_NAME_PREFIX + (cqId++); if (!isCqExists(cqName)) { @@ -1092,18 +913,9 @@ public final class CqServiceImpl implements CqService { } } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.cache.query.internal.InternalCqService#dispatchCqListeners(java.util.HashMap, - * int, java.lang.Object, java.lang.Object, byte[], - * org.apache.geode.cache.client.internal.QueueManager, org.apache.geode.internal.cache.EventID) - */ @Override public void dispatchCqListeners(HashMap cqs, int messageType, Object key, Object value, byte[] delta, QueueManager qManager, EventID eventId) { - ClientCQImpl cQuery = null; Object[] fullValue = new Object[1]; Iterator> iter = cqs.entrySet().iterator(); String cqName = null; @@ -1112,7 +924,7 @@ public final class CqServiceImpl implements CqService { try { Map.Entry entry = iter.next(); cqName = entry.getKey(); - cQuery = (ClientCQImpl) this.getCq(cqName); + ClientCQImpl cQuery = (ClientCQImpl) this.getCq(cqName); if (cQuery == null || (!cQuery.isRunning() && cQuery.getQueuedEvents() == null)) { if (isDebugEnabled) { @@ -1122,7 +934,7 @@ public final class CqServiceImpl implements CqService { continue; } - Integer cqOp = (Integer) entry.getValue(); + Integer cqOp = entry.getValue(); // If Region destroy event, close the cq. if (cqOp.intValue() == MessageType.DESTROY_REGION) { @@ -1136,8 +948,7 @@ public final class CqServiceImpl implements CqService { } // Construct CqEvent. - CqEventImpl cqEvent = null; - cqEvent = new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp.intValue()), + CqEventImpl cqEvent = new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp), key, value, delta, qManager, eventId); // Update statistics @@ -1181,11 +992,11 @@ public final class CqServiceImpl implements CqService { } // iteration. } - public void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent) { + void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent) { invokeListeners(cqName, cQuery, cqEvent, null); } - public void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent, + private void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent, Object[] fullValue) { if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) { return; @@ -1217,8 +1028,8 @@ public final class CqServiceImpl implements CqService { } Part result = (Part) GetEventValueOp .executeOnPrimary(cqEvent.getQueueManager().getPool(), cqEvent.getEventID(), null); - Object newVal = null; - if (result == null || (newVal = result.getObject()) == null) { + Object newVal = result.getObject(); + if (result == null || newVal == null) { if (!cache.getCancelCriterion().isCancelInProgress()) { Exception ex = new Exception("Failed to retrieve full value from server for eventID " @@ -1231,7 +1042,7 @@ public final class CqServiceImpl implements CqService { } } } else { - ((GemFireCacheImpl) this.cache).getCachePerfStats().incDeltaFullValuesRequested(); + this.cache.getCachePerfStats().incDeltaFullValuesRequested(); cqEvent = new CqEventImpl(cQuery, cqEvent.getBaseOperation(), cqEvent.getQueryOperation(), cqEvent.getKey(), newVal, cqEvent.getDeltaValue(), cqEvent.getQueueManager(), cqEvent.getEventID()); @@ -1278,7 +1089,7 @@ public final class CqServiceImpl implements CqService { } } - public void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) { + private void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) { if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) { return; } @@ -1335,12 +1146,8 @@ public final class CqServiceImpl implements CqService { } } - /** * Returns the Operation for the given EnumListenerEvent type. - * - * @param eventType - * @return Operation */ private Operation getOperation(int eventType) { Operation op = null; @@ -1372,15 +1179,6 @@ public final class CqServiceImpl implements CqService { return op; } - /* - * (non-Javadoc) - * - * @see - * org.apache.geode.cache.query.internal.InternalCqService#processEvents(org.apache.geode.cache. - * CacheEvent, org.apache.geode.distributed.internal.DistributionAdvisor.Profile, - * org.apache.geode.distributed.internal.DistributionAdvisor.Profile[], - * org.apache.geode.internal.cache.FilterRoutingInfo) - */ @Override public void processEvents(CacheEvent event, Profile localProfile, Profile[] profiles, FilterRoutingInfo frInfo) throws CqException { @@ -1421,7 +1219,7 @@ public final class CqServiceImpl implements CqService { continue; } Map cqs = pf.getCqMap(); - HashMap cqInfo = new HashMap(); + HashMap cqInfo = new HashMap<>(); Iterator cqIter = cqs.entrySet().iterator(); while (cqIter.hasNext()) { Map.Entry cqEntry = (Map.Entry) cqIter.next(); @@ -1454,10 +1252,10 @@ public final class CqServiceImpl implements CqService { private void processEntryEvent(CacheEvent event, Profile localProfile, Profile[] profiles, FilterRoutingInfo frInfo) throws CqException { final boolean isDebugEnabled = logger.isDebugEnabled(); - HashSet cqUnfilteredEventsSet_newValue = new HashSet(); - HashSet cqUnfilteredEventsSet_oldValue = new HashSet(); - boolean b_cqResults_newValue = false; - boolean b_cqResults_oldValue = false; + HashSet cqUnfilteredEventsSet_newValue = new HashSet<>(); + HashSet cqUnfilteredEventsSet_oldValue = new HashSet<>(); + boolean b_cqResults_newValue; + boolean b_cqResults_oldValue; boolean queryOldValue; EntryEvent entryEvent = (EntryEvent) event; Object eventKey = entryEvent.getKey(); @@ -1472,8 +1270,8 @@ public final class CqServiceImpl implements CqService { || event.getOperation().isDestroy() || event.getOperation().isInvalidate() || (event.getOperation().isCreate() && isDupEvent)); - HashMap matchedCqs = new HashMap(); - long executionStartTime = 0; + HashMap matchedCqs = new HashMap<>(); + long executionStartTime; for (int i = -1; i < profiles.length; i++) { CacheProfile cf; if (i < 0) { @@ -1498,7 +1296,6 @@ public final class CqServiceImpl implements CqService { continue; } - // Get new value. If its not retrieved. if (cqUnfilteredEventsSet_newValue.isEmpty() && (event.getOperation().isCreate() || event.getOperation().isUpdate())) { @@ -1509,7 +1306,7 @@ public final class CqServiceImpl implements CqService { } } - HashMap cqInfo = new HashMap(); + HashMap cqInfo = new HashMap<>(); Iterator cqIter = cqs.entrySet().iterator(); while (cqIter.hasNext()) { @@ -1546,7 +1343,6 @@ public final class CqServiceImpl implements CqService { } } else { boolean error = false; - // synchronized (cQuery) { try { synchronized (cQuery) { @@ -1644,7 +1440,7 @@ public final class CqServiceImpl implements CqService { cQuery.markAsDestroyedInCqResultKeys(eventKey); } } - } // end synchronized(cQuery) + } // Get the matching CQs if any. // synchronized (this.matchingCqMap){ @@ -1663,7 +1459,6 @@ public final class CqServiceImpl implements CqService { } } } - // } } if (cqEvent != null && cQuery.isRunning()) { @@ -1694,153 +1489,35 @@ public final class CqServiceImpl implements CqService { } // iteration over Profiles. } - - /* - * public void processEvents (EnumListenerEvent operation, CacheEvent event, ClientUpdateMessage - * clientMessage, CM> clientIds) throws CqException - * { - * - * //Is this a region event or an entry event if (event instanceof RegionEvent){ - * processRegionEvent(operation, event, clientMessage, clientIds); } else { processEntryEvent - * (operation, event, clientMessage, clientIds); } - * - * } - * - * private void processRegionEvent(EnumListenerEvent operation, CacheEvent event, - * ClientUpdateMessage clientMessage, CM> clientIds) - * throws CqException { - * - * if (logger.isDebugEnabled()) { logger.debug("Processing region event for region " + - * ((LocalRegion)(event.getRegion())).getName()); } HashMap filteredCqs = new HashMap(); Integer - * cqRegionEvent = generateCqRegionEvent(operation); Iterator it = - * clientIds.entrySet().iterator(); while (it.hasNext()) { Map.Entry me = (Map.Entry)it.next(); - * ClientProxyMembershipID clientId = (ClientProxyMembershipID)me.getKey(); CM cqsToBooleans = - * (CM)me.getValue(); if (cqsToBooleans == null) { continue; } Set cqs = - * cqsToBooleans.keySet(); if (cqs.isEmpty()) { continue; } filteredCqs.clear(); Iterator cqIt = - * cqs.iterator(); while (cqIt.hasNext()) { CqQueryImpl cQuery = (CqQueryImpl)cqIt.next(); if - * (operation == EnumListenerEvent.AFTER_REGION_DESTROY) { try { if (logger.isDebugEnabled()){ - * logger.debug("Closing CQ on region destroy event. CqName :" + cQuery.getName()); } - * cQuery.close(false); } catch (Exception ex) { - * logger.debug("Failed to Close CQ on region destroy. CqName :" + cQuery.getName(), ex); } - * - * } filteredCqs.put(cQuery.cqName, cqRegionEvent); - * cQuery.getVsdStats().updateStats(cqRegionEvent); - * - * } if (!filteredCqs.isEmpty()){ ((ClientUpdateMessageImpl)clientMessage).addClientCqs( clientId, - * filteredCqs); } - * - * } - * - * } - * - * private void processEntryEvent(EnumListenerEvent operation, CacheEvent event, - * ClientUpdateMessage clientMessage, CM> clientIds) - * throws CqException { HashSet cqUnfilteredEventsSet_newValue = new HashSet(); HashSet - * cqUnfilteredEventsSet_oldValue = new HashSet(); boolean b_cqResults_newValue = false; boolean - * b_cqResults_oldValue = false; EntryEvent entryEvent = (EntryEvent)event; Object eventKey = - * entryEvent.getKey(); if (operation == EnumListenerEvent.AFTER_CREATE || operation == - * EnumListenerEvent.AFTER_UPDATE) { if (entryEvent.getNewValue() != null) { //We have a new value - * to run the query on cqUnfilteredEventsSet_newValue.clear(); - * cqUnfilteredEventsSet_newValue.add(entryEvent.getNewValue()); } } - * - * HashMap matchedCqs = new HashMap(); long executionStartTime = 0; Iterator it = - * clientIds.entrySet().iterator(); while (it.hasNext()) { Map.Entry me = (Map.Entry)it.next(); - * ClientProxyMembershipID clientId = (ClientProxyMembershipID)me.getKey(); if - * (logger.isDebugEnabled()) { logger.debug("Processing event for CQ filter, ClientId : " + - * clientId); } CM cqsToBooleans = (CM)me.getValue(); if (cqsToBooleans == null) { continue; } - * Set cqs = cqsToBooleans.keySet(); if (cqs.isEmpty()) { continue; } HashMap filteredCqs - * = new HashMap(); Iterator cqIt = cqs.iterator(); while (cqIt.hasNext()) { CqQueryImpl cQuery = - * (CqQueryImpl)cqIt.next(); b_cqResults_newValue = false; b_cqResults_oldValue = false; if - * (cQuery == null || !(cQuery.isRunning())){ continue; } String cqName = - * cQuery.getServerCqName(); Integer cqEvent = null; if (matchedCqs.containsKey(cqName)) { if - * (logger.isDebugEnabled()){ logger. - * debug("Similar cq/query is already processed, getting the cq event-type from the matched cq."); - * } cqEvent = (Integer)matchedCqs.get(cqName); } else { boolean error = false; boolean - * hasSeenEvent = false; HashSet cqEventKeys = null; synchronized (cQuery) { try { // Apply query - * on new value. if (!cqUnfilteredEventsSet_newValue.isEmpty()) { executionStartTime = - * this.stats.startCqQueryExecution(); b_cqResults_newValue = evaluateQuery(cQuery, new Object[] - * {cqUnfilteredEventsSet_newValue}); this.stats.endCqQueryExecution(executionStartTime); } // - * Check if old value is cached, if not apply query on old value. if (cqToCqEventKeysMap != null) - * { synchronized (cqToCqEventKeysMap) { if ((cqEventKeys = - * (HashSet)cqToCqEventKeysMap.get(cqName)) != null) { hasSeenEvent = - * cqEventKeys.contains(eventKey); } } } if (!hasSeenEvent) { // get the oldValue. // In case of - * Update, destroy and invalidate. if (operation == EnumListenerEvent.AFTER_UPDATE || operation == - * EnumListenerEvent.AFTER_DESTROY || operation == EnumListenerEvent.AFTER_INVALIDATE) { if - * (entryEvent.getOldValue() != null) { cqUnfilteredEventsSet_oldValue.clear(); - * cqUnfilteredEventsSet_oldValue.add(entryEvent.getOldValue()); // Apply query on old value. - * executionStartTime = this.stats.startCqQueryExecution(); b_cqResults_oldValue = - * evaluateQuery(cQuery, new Object[] {cqUnfilteredEventsSet_oldValue}); - * this.stats.endCqQueryExecution(executionStartTime); } } } } catch (Exception ex) { //Any - * exception in running the query // should be caught here and buried //because this code is - * running inline with the //message processing code and we don't want to //kill that thread error - * = true; logger.info( LocalizedStrings. - * CqService_ERROR_WHILE_PROCESSING_CQ_ON_THE_EVENT_KEY_0_CQNAME_1_CLIENTID_2_ERROR_3, new - * Object[] { ((EntryEvent)event).getKey(), cQuery.getName(), clientId, - * ex.getLocalizedMessage()}); } - * - * if (error) { cqEvent = Integer.valueOf(MessageType.EXCEPTION); } else { if - * (b_cqResults_newValue) { if (hasSeenEvent || b_cqResults_oldValue) { cqEvent = - * Integer.valueOf(MessageType.LOCAL_UPDATE); } else { cqEvent = - * Integer.valueOf(MessageType.LOCAL_CREATE); } // If its create and caching is enabled, cache the - * key for this CQ. if (!hasSeenEvent && cqEventKeys != null) { cqEventKeys.add(eventKey); } } - * else if (hasSeenEvent || (b_cqResults_oldValue)) { // Base invalidate operation is treated as - * destroy. // When the invalidate comes through, the entry will no longer satisfy // the query - * and will need to be deleted. cqEvent = Integer.valueOf(MessageType.LOCAL_DESTROY); // If - * caching is enabled, remove this event's key from the cache. if (hasSeenEvent && cqEventKeys != - * null) { cqEventKeys.remove(eventKey); } } } - * - * } //end synchronized(cQuery) - * - * // Get the matching CQs if any. synchronized (this.matchingCqMap){ String query = - * cQuery.getQueryString(); ArrayList matchingCqs = (ArrayList)matchingCqMap.get(query); if - * (matchingCqs != null) { Iterator iter = matchingCqs.iterator(); while (iter.hasNext()) { String - * matchingCqName = (String)iter.next(); if (!matchingCqName.equals(cqName)){ - * matchedCqs.put(matchingCqName, cqEvent); } } } } - * - * } - * - * if (cqEvent != null){ if (logger.isDebugEnabled()) { - * logger.debug("Event is added for the CQ, CqName (clientside): " + cQuery.cqName + - * " With CQ Op : " + cqEvent + " for Client : " + clientId); } filteredCqs.put(cQuery.cqName, - * cqEvent); cQuery.getVsdStats().updateStats(cqEvent); } - * - * } // iteration over cqsToBooleans.keySet() if (!filteredCqs.isEmpty()){ - * logger.debug("Adding event map for client : "+clientId + - * " with event map size : "+filteredCqs.size()); - * ((ClientUpdateMessageImpl)clientMessage).addClientCqs(clientId, filteredCqs); } } // iteration - * over clientIds.entrySet() } - */ - private Integer generateCqRegionEvent(CacheEvent event) { Integer cqEvent = null; if (event.getOperation().isRegionDestroy()) { - cqEvent = Integer.valueOf(MessageType.DESTROY_REGION); + cqEvent = MessageType.DESTROY_REGION; } else if (event.getOperation().isRegionInvalidate()) { - cqEvent = Integer.valueOf(MessageType.INVALIDATE_REGION); + cqEvent = MessageType.INVALIDATE_REGION; } else if (event.getOperation().isClear()) { - cqEvent = Integer.valueOf(MessageType.CLEAR_REGION); + cqEvent = MessageType.CLEAR_REGION; } return cqEvent; } - /** * Manages the CQs created for the base region. This is managed here, instead of on the base * region; since the cq could be created on the base region, before base region is created (using * newCq()). */ - public void addToBaseRegionToCqNameMap(String regionName, String cqName) { + private void addToBaseRegionToCqNameMap(String regionName, String cqName) { synchronized (this.baseRegionToCqNameMap) { ArrayList cqs = this.baseRegionToCqNameMap.get(regionName); if (cqs == null) { - cqs = new ArrayList(); + cqs = new ArrayList<>(); } cqs.add(cqName); this.baseRegionToCqNameMap.put(regionName, cqs); } } - public void removeFromBaseRegionToCqNameMap(String regionName, String cqName) { + void removeFromBaseRegionToCqNameMap(String regionName, String cqName) { synchronized (this.baseRegionToCqNameMap) { ArrayList cqs = this.baseRegionToCqNameMap.get(regionName); if (cqs != null) { @@ -1864,37 +1541,12 @@ public final class CqServiceImpl implements CqService { } /** - * Removes this CQ from CQ event Cache map. This disables the caching events for this CQ. - * - * @param cqName - */ - /* - * synchronized public void removeCQFromCaching(String cqName){ if (cqToCqEventKeysMap != null) { - * // Take a lock on CqQuery object. In processEvents the maps are // handled under CqQuery - * object. if (cqToCqEventKeysMap != null){ synchronized (cqToCqEventKeysMap) { - * cqToCqEventKeysMap.remove(cqName); } } } } - */ - - /** - * Returns the CQ event cache map. - * - * @return HashMap cqToCqEventKeysMap - * - * Caller must synchronize on the returned value in order to inspect. - */ - /* - * public HashMap getCqToCqEventKeysMap(){ return cqToCqEventKeysMap; } - */ - - /** * Adds the query from the given CQ to the matched CQ map. - * - * @param cq */ - public void addToMatchingCqMap(CqQueryImpl cq) { + void addToMatchingCqMap(CqQueryImpl cq) { synchronized (this.matchingCqMap) { String cqQuery = cq.getQueryString(); - Set matchingCQs = null; + Set matchingCQs; if (!matchingCqMap.containsKey(cqQuery)) { matchingCQs = Collections.newSetFromMap(new ConcurrentHashMap()); matchingCqMap.put(cqQuery, matchingCQs); @@ -1912,10 +1564,8 @@ public final class CqServiceImpl implements CqService { /** * Removes the query from the given CQ from the matched CQ map. - * - * @param cq */ - public void removeFromMatchingCqMap(CqQueryImpl cq) { + private void removeFromMatchingCqMap(CqQueryImpl cq) { synchronized (this.matchingCqMap) { String cqQuery = cq.getQueryString(); if (matchingCqMap.containsKey(cqQuery)) { @@ -1947,10 +1597,6 @@ public final class CqServiceImpl implements CqService { * Applies the query on the event. This method takes care of the performance related changed done * to improve the CQ-query performance. When CQ-query is executed first time, it saves the query * related information in the execution context and uses that info in later executions. - * - * @param cQuery - * @param event - * @return boolean */ private boolean evaluateQuery(CqQueryImpl cQuery, Object[] event) throws Exception { ExecutionContext execContext = cQuery.getQueryExecutionContext(); @@ -1983,19 +1629,6 @@ public final class CqServiceImpl implements CqService { return this.cqNameToUserAttributesMap.get(cqName); } - // public static void memberLeft(String poolName) { - // if (cqServiceSingleton != null && !cqServiceSingleton.isServer()) { - // cqServiceSingleton.sendMemberDisconnectedMessageToCqs(poolName); - // } - // } - // - // public static void memberCrashed(String poolName) { - // if (cqServiceSingleton != null && !cqServiceSingleton.isServer()) { - // cqServiceSingleton.sendMemberDisconnectedMessageToCqs(poolName); - // } - // } - // - @Override public void cqsDisconnected(Pool pool) { invokeCqsConnected(pool, false); @@ -2014,7 +1647,7 @@ public final class CqServiceImpl implements CqService { // Check to see if we are already connected/disconnected. // If state has not changed, do not invoke another connected/disconnected synchronized (cqPoolsConnected) { - // don't repeatily send same connect/disconnect message to cq's on repeated fails of + // don't repeatedly send same connect/disconnect message to cq's on repeated fails of // RedundancySatisfier if (cqPoolsConnected.containsKey(poolName) && connected == cqPoolsConnected.get(poolName)) { return; @@ -2059,13 +1692,6 @@ public final class CqServiceImpl implements CqService { SystemFailure.checkFailure(); logger.warn(LocalizedMessage .create(LocalizedStrings.CqService_ERROR_SENDING_CQ_CONNECTION_STATUS, cqName), t); - - if (t instanceof VirtualMachineError) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.CqService_VIRTUALMACHINEERROR_PROCESSING_CQLISTENER_FOR_CQ_0, - cqName), t); - return; - } } } } @@ -2075,7 +1701,4 @@ public final class CqServiceImpl implements CqService { public List getAllDurableCqsFromServer(InternalPool pool) { return new ServerCQProxyImpl(pool).getAllDurableCqsFromServer(); } - - } - http://git-wip-us.apache.org/repos/asf/geode/blob/363e50d2/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java index ba71143..a675162 100644 --- a/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java +++ b/geode-cq/src/main/java/org/apache/geode/cache/query/internal/cq/CqServiceStatisticsImpl.java @@ -14,11 +14,9 @@ */ package org.apache.geode.cache.query.internal.cq; -import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.query.CqServiceStatistics; import org.apache.geode.cache.query.CqQuery; import org.apache.geode.cache.query.internal.DefaultQueryService; -import org.apache.geode.internal.cache.GemFireCacheImpl; /** * Provides statistical information about CqService. @@ -26,24 +24,22 @@ import org.apache.geode.internal.cache.GemFireCacheImpl; * @since GemFire 5.5 */ public class CqServiceStatisticsImpl implements CqServiceStatistics { + private CqServiceImpl cqService; - // private long activeCqs; - // private long stoppedCqs; - // private long closedCqs; - // private long createdCqs; /** * Constructor for CqStatisticsImpl * * @param cqs - CqService */ - public CqServiceStatisticsImpl(CqServiceImpl cqs) { + CqServiceStatisticsImpl(CqServiceImpl cqs) { cqService = cqs; } /** * Returns the number of CQs currently executing */ + @Override public long numCqsActive() { return this.cqService.getCqServiceVsdStats().getNumCqsActive(); } @@ -53,6 +49,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics { * * @return long number of cqs created. */ + @Override public long numCqsCreated() { return this.cqService.getCqServiceVsdStats().getNumCqsCreated(); } @@ -60,6 +57,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics { /** * Returns number of Cqs that are closed. */ + @Override public long numCqsClosed() { return this.cqService.getCqServiceVsdStats().getNumCqsClosed(); } @@ -67,6 +65,7 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics { /** * Returns number of Cqs that are stopped. */ + @Override public long numCqsStopped() { return this.cqService.getCqServiceVsdStats().getNumCqsStopped(); } @@ -74,20 +73,18 @@ public class CqServiceStatisticsImpl implements CqServiceStatistics { /** * Returns number of CQs created from the client. */ + @Override public long numCqsOnClient() { return this.cqService.getCqServiceVsdStats().getNumCqsOnClient(); } /** * Returns the number of CQs (active + suspended) on the given region. - * - * @param regionName */ + @Override public long numCqsOnRegion(String regionName) { - DefaultQueryService queryService = - (DefaultQueryService) ((GemFireCacheImpl) CacheFactory.getAnyInstance()) - .getLocalQueryService(); + (DefaultQueryService) cqService.getInternalCache().getLocalQueryService(); try { CqQuery[] cqs = queryService.getCqs(regionName);