Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2DB2418942 for ; Wed, 20 Jan 2016 02:22:15 +0000 (UTC) Received: (qmail 53970 invoked by uid 500); 20 Jan 2016 02:22:15 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 53875 invoked by uid 500); 20 Jan 2016 02:22:15 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 53862 invoked by uid 99); 20 Jan 2016 02:22:14 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jan 2016 02:22:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 8983B1A0649 for ; Wed, 20 Jan 2016 02:22:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.227 X-Spam-Level: * X-Spam-Status: No, score=1.227 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id xUdoZZ-OYr41 for ; Wed, 20 Jan 2016 02:22:08 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 4532F215CE for ; Wed, 20 Jan 2016 02:22:08 +0000 (UTC) Received: (qmail 53741 invoked by uid 99); 20 Jan 2016 02:22:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Jan 2016 02:22:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 24719E0484; Wed, 20 Jan 2016 02:22:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rvs@apache.org To: commits@geode.incubator.apache.org Date: Wed, 20 Jan 2016 02:22:56 -0000 Message-Id: <3e76a2033aa14b7a9b863e1ca64f9fb2@git.apache.org> In-Reply-To: <38f5b46407394391bacbec02d10d52c0@git.apache.org> References: <38f5b46407394391bacbec02d10d52c0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [50/51] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqQueryImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqQueryImpl.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqQueryImpl.java new file mode 100644 index 0000000..f2aed77 --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqQueryImpl.java @@ -0,0 +1,375 @@ +/*========================================================================= + * Copyright Copyright (c) 2000-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * more patents listed at http://www.pivotal.io/patents. + * $Id: DefaultQueryService.java,v 1.2 2005/02/01 17:19:20 vaibhav Exp $ + *========================================================================= + */ +package com.gemstone.gemfire.cache.query.internal.cq; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.StatisticsFactory; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.query.CqClosedException; +import com.gemstone.gemfire.cache.query.CqEvent; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.CqExistsException; +import com.gemstone.gemfire.cache.query.CqState; +import com.gemstone.gemfire.cache.query.CqStatistics; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.cache.query.internal.CompiledIteratorDef; +import com.gemstone.gemfire.cache.query.internal.CompiledRegion; +import com.gemstone.gemfire.cache.query.internal.CompiledSelect; +import com.gemstone.gemfire.cache.query.internal.CompiledValue; +import com.gemstone.gemfire.cache.query.internal.CqQueryVsdStats; +import com.gemstone.gemfire.cache.query.internal.CqStateImpl; +import com.gemstone.gemfire.cache.query.internal.DefaultQuery; +import com.gemstone.gemfire.cache.query.internal.ExecutionContext; +import com.gemstone.gemfire.cache.query.internal.QueryExecutionContext; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.InternalLogWriter; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.i18n.StringId; + +/** + * @author rmadduri + * @author anil + * @since 5.5 + * Represents the CqQuery object. Implements CqQuery API and CqAttributeMutator. + * + */ +public abstract class CqQueryImpl implements InternalCqQuery { + private static final Logger logger = LogService.getLogger(); + + protected String cqName; + + protected String queryString; + + protected static final Object TOKEN = new Object(); + + protected LocalRegion cqBaseRegion; + + protected Query query = null; + + protected InternalLogWriter securityLogWriter; + + protected CqServiceImpl cqService; + + protected String regionName; + + protected boolean isDurable = false ; + + // Stats counters + protected CqStatisticsImpl cqStats; + + protected CqQueryVsdStats stats; + + protected final CqStateImpl cqState = new CqStateImpl(); + + protected ExecutionContext queryExecutionContext = null; + + public static TestHook testHook = null; + + /** + * Constructor. + */ + public CqQueryImpl(){ + } + + public CqQueryImpl(CqServiceImpl cqService, String cqName, String queryString, boolean isDurable) { + this.cqName = cqName; + this.queryString = queryString; + this.securityLogWriter = (InternalLogWriter) cqService.getCache().getSecurityLoggerI18n(); + this.cqService = cqService; + this.isDurable = isDurable ; + } + + /** + * returns CQ name + */ + public String getName() { + return this.cqName; + } + + @Override + public void setName(String cqName) { + this.cqName = cqName; + } + + public void setCqService(CqService cqService) { + this.cqService = (CqServiceImpl) cqService; + } + + /** + * get the region name in CQ query + */ + @Override + public String getRegionName() { + return this.regionName; + } + + public void updateCqCreateStats() { + // Initialize the VSD statistics + StatisticsFactory factory = cqService.getCache().getDistributedSystem(); + this.stats = new CqQueryVsdStats(factory, getServerCqName()); + this.cqStats = new CqStatisticsImpl(this); + + // Update statistics with CQ creation. + this.cqService.stats.incCqsStopped(); + this.cqService.stats.incCqsCreated(); + this.cqService.stats.incCqsOnClient(); + } + + /** + * Validates the CQ. Checks for cq constraints. + * Also sets the base region name. + */ + public void validateCq() { + Cache cache = cqService.getCache(); + DefaultQuery locQuery = (DefaultQuery)((GemFireCacheImpl)cache).getLocalQueryService().newQuery(this.queryString); + this.query = locQuery; +// assert locQuery != null; + + // validate Query. + Object[] parameters = new Object[0]; // parameters are not permitted + + // check that it is only a SELECT statement (possibly with IMPORTs) + CompiledSelect select = locQuery.getSimpleSelect(); + if (select == null) { + throw new UnsupportedOperationException(LocalizedStrings.CqQueryImpl_CQ_QUERIES_MUST_BE_A_SELECT_STATEMENT_ONLY.toLocalizedString()); + } + + // must not be a DISTINCT select + if (select.isDistinct()) { + throw new UnsupportedOperationException(LocalizedStrings.CqQueryImpl_SELECT_DISTINCT_QUERIES_NOT_SUPPORTED_IN_CQ.toLocalizedString()); + } + + // get the regions referenced in this query + Set regionsInQuery = locQuery.getRegionsInQuery(parameters); + // check for more than one region is referenced in the query + // (though it could still be one region referenced multiple times) + if (regionsInQuery.size() > 1 || regionsInQuery.size() < 1) { + throw new UnsupportedOperationException(LocalizedStrings.CqQueryImpl_CQ_QUERIES_MUST_REFERENCE_ONE_AND_ONLY_ONE_REGION.toLocalizedString()); + } + this.regionName = (String)regionsInQuery.iterator().next(); + + // make sure the where clause references no regions + Set regions = new HashSet(); + CompiledValue whereClause = select.getWhereClause(); + if (whereClause != null) { + whereClause.getRegionsInQuery(regions, parameters); + if (!regions.isEmpty()) { + throw new UnsupportedOperationException(LocalizedStrings.CqQueryImpl_THE_WHERE_CLAUSE_IN_CQ_QUERIES_CANNOT_REFER_TO_A_REGION.toLocalizedString()); + } + } + List fromClause = select.getIterators(); + // cannot have more than one iterator in FROM clause + if (fromClause.size() > 1) { + throw new UnsupportedOperationException(LocalizedStrings.CqQueryImpl_CQ_QUERIES_CANNOT_HAVE_MORE_THAN_ONE_ITERATOR_IN_THE_FROM_CLAUSE.toLocalizedString()); + } + + // the first iterator in the FROM clause must be just a CompiledRegion + CompiledIteratorDef itrDef = (CompiledIteratorDef)fromClause.get(0); + // By process of elimination, we know that the first iterator contains a reference + // to the region. Check to make sure it is only a CompiledRegion + if (!(itrDef.getCollectionExpr() instanceof CompiledRegion)) { + throw new UnsupportedOperationException(LocalizedStrings.CqQueryImpl_CQ_QUERIES_MUST_HAVE_A_REGION_PATH_ONLY_AS_THE_FIRST_ITERATOR_IN_THE_FROM_CLAUSE.toLocalizedString()); + } + + // must not have any projections + List projs = select.getProjectionAttributes(); + if (projs != null) { + throw new UnsupportedOperationException(LocalizedStrings.CqQueryImpl_CQ_QUERIES_DO_NOT_SUPPORT_PROJECTIONS.toLocalizedString()); + } + + // check the orderByAttrs, not supported + List orderBys = select.getOrderByAttrs(); + if (orderBys != null) { + throw new UnsupportedOperationException(LocalizedStrings.CqQueryImpl_CQ_QUERIES_DO_NOT_SUPPORT_ORDER_BY.toLocalizedString()); + } + + // Set Query ExecutionContext, that will be used in later execution. + this.setQueryExecutionContext(new QueryExecutionContext(null, this.cqService.getCache())); + } + + /** + * Removes the CQ from CQ repository. + * @throws CqException + */ + protected void removeFromCqMap() throws CqException { + try { + cqService.removeCq(this.getServerCqName()); + } catch (Exception ex){ + StringId errMsg = LocalizedStrings.CqQueryImpl_FAILED_TO_REMOVE_CONTINUOUS_QUERY_FROM_THE_REPOSITORY_CQNAME_0_ERROR_1; + Object[] errMsgArgs = new Object[] {cqName, ex.getLocalizedMessage()}; + String msg = errMsg.toLocalizedString(errMsgArgs); + logger.error(msg); + throw new CqException(msg, ex); + } + if (logger.isDebugEnabled()){ + logger.debug("Removed CQ from the CQ repository. CQ Name: {}", this.cqName); + } + } + + /** + * Returns the QueryString of this CQ. + */ + public String getQueryString() { + return queryString; + } + + /** + * Return the query after replacing region names with parameters + * @return the Query for the query string + */ + public Query getQuery(){ + return query; + } + + + /** + * @see com.gemstone.gemfire.cache.query.CqQuery#getStatistics() + */ + public CqStatistics getStatistics() { + return cqStats; + } + + /* (non-Javadoc) + * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#getCqBaseRegion() + */ + @Override + public LocalRegion getCqBaseRegion() { + return this.cqBaseRegion; + } + + protected abstract void cleanup() throws CqException; + + /** + * @return Returns the Region name on which this cq is created. + */ + public String getBaseRegionName() { + + return this.regionName; + } + + public abstract String getServerCqName(); + + /** + * Return the state of this query. + * Should not modify this state without first locking it. + * @return STOPPED RUNNING or CLOSED + */ + public CqState getState() { + return this.cqState; + } + + /* (non-Javadoc) + * @see com.gemstone.gemfire.cache.query.internal.InternalCqQuery2#setCqState(int) + */ + @Override + public void setCqState(int state) { + if (this.isClosed()) { + throw new CqClosedException(LocalizedStrings.CqQueryImpl_CQ_IS_CLOSED_CQNAME_0 + .toLocalizedString(this.cqName)); + } + + synchronized (cqState) { + if (state == CqStateImpl.RUNNING){ + if (this.isRunning()) { + //throw new IllegalStateException(LocalizedStrings.CqQueryImpl_CQ_IS_NOT_IN_RUNNING_STATE_STOP_CQ_DOES_NOT_APPLY_CQNAME_0 + // .toLocalizedString(this.cqName)); + } + this.cqState.setState(CqStateImpl.RUNNING); + this.cqService.stats.incCqsActive(); + this.cqService.stats.decCqsStopped(); + } else if(state == CqStateImpl.STOPPED) { + this.cqState.setState(CqStateImpl.STOPPED); + this.cqService.stats.incCqsStopped(); + this.cqService.stats.decCqsActive(); + } else if(state == CqStateImpl.CLOSING) { + this.cqState.setState(state); + } + } + } + + /** + * Update CQ stats + * @param cqEvent object + */ + public void updateStats(CqEvent cqEvent) { + this.stats.updateStats(cqEvent); // Stats for VSD + } + + /** + * Return true if the CQ is in running state + * @return true if running, false otherwise + */ + public boolean isRunning() { + return this.cqState.isRunning(); + } + + /** + * Return true if the CQ is in Sstopped state + * @return true if stopped, false otherwise + */ + public boolean isStopped() { + return this.cqState.isStopped(); + } + + /** + * Return true if the CQ is closed + * @return true if closed, false otherwise + */ + public boolean isClosed() { + return this.cqState.isClosed(); + } + + /** + * Return true if the CQ is in closing state. + * @return true if close in progress, false otherwise + */ + public boolean isClosing() { + return this.cqState.isClosing(); + } + + /** + * Return true if the CQ is durable + * @return true if durable, false otherwise + */ + public boolean isDurable() { + return this.isDurable; + } + + /** + * Returns a reference to VSD stats of the CQ + * @return VSD stats of the CQ + */ + @Override + public CqQueryVsdStats getVsdStats() { + return stats; + } + + public ExecutionContext getQueryExecutionContext() { + return queryExecutionContext; + } + + public void setQueryExecutionContext(ExecutionContext queryExecutionContext) { + this.queryExecutionContext = queryExecutionContext; + } + + /** Test Hook */ + public interface TestHook { + public void pauseUntilReady(); + public void ready(); + public int numQueuedEvents(); + public void setEventCount(int count); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceFactoryImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceFactoryImpl.java b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceFactoryImpl.java new file mode 100644 index 0000000..e39ba53 --- /dev/null +++ b/gemfire-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceFactoryImpl.java @@ -0,0 +1,53 @@ +package com.gemstone.gemfire.cache.query.internal.cq; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import com.gemstone.gemfire.cache.query.internal.cq.spi.CqServiceFactory; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.tier.Command; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.CommandInitializer; +import com.gemstone.gemfire.internal.cache.tier.sockets.command.CloseCQ; +import com.gemstone.gemfire.internal.cache.tier.sockets.command.ExecuteCQ; +import com.gemstone.gemfire.internal.cache.tier.sockets.command.ExecuteCQ61; +import com.gemstone.gemfire.internal.cache.tier.sockets.command.GetCQStats; +import com.gemstone.gemfire.internal.cache.tier.sockets.command.GetDurableCQs; +import com.gemstone.gemfire.internal.cache.tier.sockets.command.MonitorCQ; +import com.gemstone.gemfire.internal.cache.tier.sockets.command.StopCQ; + +public class CqServiceFactoryImpl implements CqServiceFactory { + + public void initialize() { + { + Map versions = new HashMap(); + versions.put(Version.GFE_57, ExecuteCQ.getCommand()); + versions.put(Version.GFE_61, ExecuteCQ61.getCommand()); + CommandInitializer.registerCommand(MessageType.EXECUTECQ_MSG_TYPE, versions); + CommandInitializer.registerCommand(MessageType.EXECUTECQ_WITH_IR_MSG_TYPE, versions); + } + + CommandInitializer.registerCommand(MessageType.GETCQSTATS_MSG_TYPE, Collections.singletonMap(Version.GFE_57, GetCQStats.getCommand())); + CommandInitializer.registerCommand(MessageType.MONITORCQ_MSG_TYPE, Collections.singletonMap(Version.GFE_57, MonitorCQ.getCommand())); + CommandInitializer.registerCommand(MessageType.STOPCQ_MSG_TYPE, Collections.singletonMap(Version.GFE_57, StopCQ.getCommand())); + CommandInitializer.registerCommand(MessageType.CLOSECQ_MSG_TYPE, Collections.singletonMap(Version.GFE_57, CloseCQ.getCommand())); + CommandInitializer.registerCommand(MessageType.GETDURABLECQS_MSG_TYPE, Collections.singletonMap(Version.GFE_70, GetDurableCQs.getCommand())); + } + + @Override + public CqService create(GemFireCacheImpl cache) { + return new CqServiceImpl(cache); + } + + @Override + public ServerCQ readCqQuery(DataInput in) throws ClassNotFoundException, IOException { + ServerCQImpl cq = new ServerCQImpl(); + cq.fromData(in); + return cq; + } + +}