Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 788E7200C1B for ; Tue, 14 Feb 2017 14:54:41 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 770C3160B5F; Tue, 14 Feb 2017 13:54:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 21190160B52 for ; Tue, 14 Feb 2017 14:54:39 +0100 (CET) Received: (qmail 54631 invoked by uid 500); 14 Feb 2017 13:54:39 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 54622 invoked by uid 99); 14 Feb 2017 13:54:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Feb 2017 13:54:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 38317DFCA3; Tue, 14 Feb 2017 13:54:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Message-Id: <4b80cbf0c5cf4f39bc8344ee277302fc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-4436 API for collecting list of running queries and cancel them. Date: Tue, 14 Feb 2017 13:54:39 +0000 (UTC) archived-at: Tue, 14 Feb 2017 13:54:41 -0000 Repository: ignite Updated Branches: refs/heads/master 8e12513ef -> 49237343d IGNITE-4436 API for collecting list of running queries and cancel them. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/49237343 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/49237343 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/49237343 Branch: refs/heads/master Commit: 49237343d53ee33d44e5599cd7fe7da868ee30a1 Parents: 8e12513 Author: AKuznetsov Authored: Tue Feb 14 20:54:31 2017 +0700 Committer: AKuznetsov Committed: Tue Feb 14 20:54:31 2017 +0700 ---------------------------------------------------------------------- .../processors/query/GridQueryIndexing.java | 17 +- .../processors/query/GridQueryProcessor.java | 32 ++- .../processors/query/GridRunningQueryInfo.java | 132 ++++++++++++ .../internal/visor/VisorMultiNodeTask.java | 2 +- .../visor/query/VisorCancelQueriesTask.java | 72 +++++++ .../query/VisorCollectRunningQueriesTask.java | 96 +++++++++ .../internal/visor/query/VisorRunningQuery.java | 132 ++++++++++++ .../cache/query/GridCacheTwoStepQuery.java | 18 +- .../processors/query/h2/IgniteH2Indexing.java | 81 +++++++- .../query/h2/sql/GridSqlQuerySplitter.java | 4 +- .../h2/twostep/GridReduceQueryExecutor.java | 60 +++++- .../cache/CacheSqlQueryValueCopySelfTest.java | 208 +++++++++++++++++-- .../cache/GridCacheCrossCacheQuerySelfTest.java | 2 +- .../h2/GridIndexingSpiAbstractSelfTest.java | 7 + 14 files changed, 823 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index ef39d96..ca04724 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -241,7 +241,22 @@ public interface GridQueryIndexing { public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException; /** + * Collect queries that already running more than specified duration. + * + * @param duration Duration to check. + * @return Collection of long running queries. + */ + public Collection runningQueries(long duration); + + /** + * Cancel specified queries. + * + * @param queries Queries ID's to cancel. + */ + public void cancelQueries(Collection queries); + + /** * Cancels all executing queries. */ public void cancelAllQueries(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index cb7c7e7..ee9224b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -44,7 +44,6 @@ import javax.cache.Cache; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryField; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; @@ -119,7 +118,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { private static final int QRY_DETAIL_METRICS_EVICTION_FREQ = 3_000; /** */ - private static Set> SQL_TYPES = new HashSet<>(F.>asList( + private static final Set> SQL_TYPES = new HashSet<>(F.>asList( Integer.class, Boolean.class, Byte.class, @@ -920,6 +919,29 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Collect queries that already running more than specified duration. + * + * @param duration Duration to check. + * @return Collection of long running queries. + */ + public Collection runningQueries(long duration) { + if (moduleEnabled()) + return idx.runningQueries(duration); + + return Collections.emptyList(); + } + + /** + * Cancel specified queries. + * + * @param queries Queries ID's to cancel. + */ + public void cancelQueries(Collection queries) { + if (moduleEnabled()) + idx.cancelQueries(queries); + } + + /** * @param sqlQry Sql query. * @param params Params. */ @@ -2731,7 +2753,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** Accessor that deals with fields. */ - private final static class FieldAccessor implements PropertyAccessor { + private static final class FieldAccessor implements PropertyAccessor { /** Field to access. */ private final Field fld; @@ -2774,7 +2796,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** Getter and setter methods based accessor. */ - private final static class MethodsAccessor implements PropertyAccessor { + private static final class MethodsAccessor implements PropertyAccessor { /** */ private final Method getter; @@ -2832,7 +2854,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** Accessor with getter only. */ - private final static class ReadOnlyMethodsAccessor implements PropertyAccessor { + private static final class ReadOnlyMethodsAccessor implements PropertyAccessor { /** */ private final Method getter; http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java new file mode 100644 index 0000000..d77c8c0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; + +/** + * Query descriptor. + */ +public class GridRunningQueryInfo { + /** */ + private final long id; + + /** */ + private final String qry; + + /** Query type. */ + private final GridCacheQueryType qryType; + + /** */ + private final String cache; + + /** */ + private final long startTime; + + /** */ + private final GridQueryCancel cancel; + + /** */ + private final boolean loc; + + /** + * @param id Query ID. + * @param qry Query text. + * @param qryType Query type. + * @param cache Cache where query was executed. + * @param startTime Query start time. + * @param cancel Query cancel. + * @param loc Local query flag. + */ + public GridRunningQueryInfo(Long id, String qry, GridCacheQueryType qryType, String cache, long startTime, + GridQueryCancel cancel, boolean loc) { + this.id = id; + this.qry = qry; + this.qryType = qryType; + this.cache = cache; + this.startTime = startTime; + this.cancel = cancel; + this.loc = loc; + } + + /** + * @return Query ID. + */ + public Long id() { + return id; + } + + /** + * @return Query text. + */ + public String query() { + return qry; + } + + /** + * @return Query type. + */ + public GridCacheQueryType queryType() { + return qryType; + } + + /** + * @return Cache where query was executed. + */ + public String cache() { + return cache; + } + + /** + * @return Query start time. + */ + public long startTime() { + return startTime; + } + + /** + * @param curTime Current time. + * @param duration Duration of long query. + * @return {@code true} if this query should be considered as long running query. + */ + public boolean longQuery(long curTime, long duration) { + return curTime - startTime > duration; + } + + /** + * Cancel query. + */ + public void cancel() { + if (cancel != null) + cancel.cancel(); + } + + /** + * @return {@code true} if query can be cancelled. + */ + public boolean cancelable() { + return cancel != null; + } + + /** + * @return {@code true} if query is local. + */ + public boolean local() { + return loc; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java index 57f1346..ece1a17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/VisorMultiNodeTask.java @@ -130,4 +130,4 @@ public abstract class VisorMultiNodeTask implements ComputeTask, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorCancelQueriesJob job(Collection arg) { + return new VisorCancelQueriesJob(arg, debug); + } + + /** {@inheritDoc} */ + @Nullable @Override protected Void reduce0(List results) throws IgniteException { + return null; + } + + /** + * Job to cancel queries on node. + */ + private static class VisorCancelQueriesJob extends VisorJob, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Create job with specified argument. + * + * @param arg Job argument. + * @param debug Flag indicating whether debug information should be printed into node log. + */ + protected VisorCancelQueriesJob(@Nullable Collection arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected Void run(@Nullable Collection queries) throws IgniteException { + ignite.context().query().cancelQueries(queries); + + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java new file mode 100644 index 0000000..2b40e61 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorCollectRunningQueriesTask.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; +import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorMultiNodeTask; +import org.jetbrains.annotations.Nullable; + +/** + * Task to collect currently running queries. + */ +@GridInternal +public class VisorCollectRunningQueriesTask extends VisorMultiNodeTask>, Collection> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorCollectRunningQueriesJob job(Long arg) { + return new VisorCollectRunningQueriesJob(arg, debug); + } + + /** {@inheritDoc} */ + @Nullable @Override protected Map> reduce0(List results) throws IgniteException { + Map> map = new HashMap<>(); + + for (ComputeJobResult res : results) + if (res.getException() == null) { + Collection queries = res.getData(); + + map.put(res.getNode().id(), queries); + } + + return map; + } + + /** + * Job to collect currently running queries from node. + */ + private static class VisorCollectRunningQueriesJob extends VisorJob> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Create job with specified argument. + * + * @param arg Job argument. + * @param debug Flag indicating whether debug information should be printed into node log. + */ + protected VisorCollectRunningQueriesJob(@Nullable Long arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected Collection run(@Nullable Long duration) throws IgniteException { + Collection queries = ignite.context().query() + .runningQueries(duration != null ? duration : 0); + + Collection res = new ArrayList<>(queries.size()); + + long curTime = U.currentTimeMillis(); + + for (GridRunningQueryInfo qry : queries) + res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.cache(), + qry.startTime(), curTime - qry.startTime(), + qry.cancelable(), qry.local())); + + return res; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java new file mode 100644 index 0000000..fc6bc7a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQuery.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.query; + +import java.io.Serializable; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; + +/** + * Descriptor of running query. + */ +public class VisorRunningQuery implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long id; + + /** Query text. */ + private String qry; + + /** Query type. */ + private GridCacheQueryType qryType; + + /** Cache name for query. */ + private String cache; + + /** */ + private long startTime; + + /** */ + private long duration; + + /** */ + private boolean cancellable; + + /** */ + private boolean loc; + + /** + * @param id Query ID. + * @param qry Query text. + * @param qryType Query type. + * @param cache Cache where query was executed. + * @param startTime Query start time. + * @param duration Query current duration. + * @param cancellable {@code true} if query can be canceled. + * @param loc {@code true} if query is local. + */ + public VisorRunningQuery(long id, String qry, GridCacheQueryType qryType, String cache, + long startTime, long duration, + boolean cancellable, boolean loc) { + this.id = id; + this.qry = qry; + this.qryType = qryType; + this.cache = cache; + this.startTime = startTime; + this.duration = duration; + this.cancellable = cancellable; + this.loc = loc; + } + + /** + * @return Query ID. + */ + public long getId() { + return id; + } + + /** + * @return Query txt. + */ + public String getQuery() { + return qry; + } + + /** + * @return Query type. + */ + public GridCacheQueryType getQueryType() { + return qryType; + } + + /** + * @return Cache name. + */ + public String getCache() { + return cache; + } + + /** + * @return Query start time. + */ + public long getStartTime() { + return startTime; + } + + /** + * @return Query duration. + */ + public long getDuration() { + return duration; + } + + /** + * @return {@code true} if query can be cancelled. + */ + public boolean isCancelable() { + return cancellable; + } + + /** + * @return {@code true} if query is local. + */ + public boolean isLocal() { + return loc; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java index 8dcba2f..f53936f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java @@ -46,6 +46,9 @@ public class GridCacheTwoStepQuery { private boolean explain; /** */ + private String originalSql; + + /** */ private Collection spaces; /** */ @@ -67,10 +70,12 @@ public class GridCacheTwoStepQuery { private List extraCaches; /** + * @param originalSql Original query SQL. * @param schemas Schema names in query. * @param tbls Tables in query. */ - public GridCacheTwoStepQuery(Set schemas, Set tbls) { + public GridCacheTwoStepQuery(String originalSql, Set schemas, Set tbls) { + this.originalSql = originalSql; this.schemas = schemas; this.tbls = tbls; } @@ -196,6 +201,13 @@ public class GridCacheTwoStepQuery { } /** + * @return Original query SQL. + */ + public String originalSql() { + return originalSql; + } + + /** * @return Spaces. */ public Collection spaces() { @@ -223,7 +235,7 @@ public class GridCacheTwoStepQuery { public GridCacheTwoStepQuery copy(Object[] args) { assert !explain; - GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls); + GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls); cp.caches = caches; cp.extraCaches = extraCaches; @@ -250,4 +262,4 @@ public class GridCacheTwoStepQuery { @Override public String toString() { return S.toString(GridCacheTwoStepQuery.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 9416621..e4b0c1f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -52,6 +52,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.cache.Cache; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; @@ -81,6 +82,7 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; @@ -179,6 +181,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_ import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.IgniteSystemProperties.getString; +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL; +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.TEXT; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED; @@ -286,9 +291,15 @@ public class IgniteH2Indexing implements GridQueryIndexing { private final Map space2schema = new ConcurrentHashMap8<>(); /** */ + private AtomicLong qryIdGen; + + /** */ private GridSpinBusyLock busyLock; /** */ + private final ConcurrentMap runs = new ConcurrentHashMap8<>(); + + /** */ private final ThreadLocal connCache = new ThreadLocal() { @Nullable @Override public ConnectionWrapper get() { ConnectionWrapper c = super.get(); @@ -773,8 +784,19 @@ public class IgniteH2Indexing implements GridQueryIndexing { IndexingQueryFilter filters) throws IgniteCheckedException { TableDescriptor tbl = tableDescriptor(spaceName, type); - if (tbl != null && tbl.luceneIdx != null) - return tbl.luceneIdx.query(qry, filters); + if (tbl != null && tbl.luceneIdx != null) { + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, spaceName, + U.currentTimeMillis(), null, true); + + try { + runs.put(run.id(), run); + + return tbl.luceneIdx.query(qry, filters); + } + finally { + runs.remove(run.id()); + } + } return new GridEmptyCloseableIterator<>(); } @@ -832,6 +854,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.set(ctx); + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS, + spaceName, U.currentTimeMillis(), cancel, true); + + runs.putIfAbsent(run.id(), run); + try { ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel); @@ -839,6 +866,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { } finally { GridH2QueryContext.clearThreadLocal(); + + runs.remove(run.id()); } } }; @@ -1088,6 +1117,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false)); + GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName, + U.currentTimeMillis(), null, true); + + runs.put(run.id(), run); + try { ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null); @@ -1095,6 +1129,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { } finally { GridH2QueryContext.clearThreadLocal(); + + runs.remove(run.id()); } } @@ -1739,6 +1775,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { this.busyLock = busyLock; + qryIdGen = new AtomicLong(); + if (SysProperties.serializeJavaObject) { U.warn(log, "Serialization of Java objects in H2 was enabled."); @@ -1793,7 +1831,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { marshaller = ctx.config().getMarshaller(); mapQryExec = new GridMapQueryExecutor(busyLock); - rdcQryExec = new GridReduceQueryExecutor(busyLock); + rdcQryExec = new GridReduceQueryExecutor(qryIdGen, busyLock); mapQryExec.start(ctx, this); rdcQryExec.start(ctx, this); @@ -2247,6 +2285,37 @@ public class IgniteH2Indexing implements GridQueryIndexing { return cols; } + + /** {@inheritDoc} */ + @Override public Collection runningQueries(long duration) { + Collection res = new ArrayList<>(); + + res.addAll(runs.values()); + res.addAll(rdcQryExec.longRunningQueries(duration)); + + return res; + } + + /** {@inheritDoc} */ + @Override public void cancelQueries(Collection queries) { + if (!F.isEmpty(queries)) { + for (Long qryId : queries) { + GridRunningQueryInfo run = runs.get(qryId); + + if (run != null) + run.cancel(); + } + + rdcQryExec.cancelQueries(queries); + } + } + + /** {@inheritDoc} */ + @Override public void cancelAllQueries() { + for (Connection conn : conns) + U.close(conn, log); + } + /** * Wrapper to store connection and flag is schema set or not. */ @@ -3157,10 +3226,4 @@ public class IgniteH2Indexing implements GridQueryIndexing { lastUsage = U.currentTimeMillis(); } } - - /** {@inheritDoc} */ - @Override public void cancelAllQueries() { - for (Connection conn : conns) - U.close(conn, log); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 09952cf..e164315 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -174,7 +174,7 @@ public class GridSqlQuerySplitter { qry = collectAllTables(qry, schemas, tbls); // Build resulting two step query. - GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(schemas, tbls); + GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(qry.getSQL(), schemas, tbls); // Map query will be direct reference to the original query AST. // Thus all the modifications will be performed on the original AST, so we should be careful when @@ -958,4 +958,4 @@ public class GridSqlQuerySplitter { private static GridSqlFunction function(GridSqlFunctionType type) { return new GridSqlFunction(type); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index cbe1599..f54fab6 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; @@ -98,6 +99,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; +import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE; /** @@ -120,7 +122,7 @@ public class GridReduceQueryExecutor { private IgniteLogger log; /** */ - private final AtomicLong reqIdGen = new AtomicLong(); + private final AtomicLong qryIdGen; /** */ private final ConcurrentMap runs = new ConcurrentHashMap8<>(); @@ -167,9 +169,11 @@ public class GridReduceQueryExecutor { }; /** + * @param qryIdGen Query ID generator. * @param busyLock Busy lock. */ - public GridReduceQueryExecutor(GridSpinBusyLock busyLock) { + public GridReduceQueryExecutor(AtomicLong qryIdGen, GridSpinBusyLock busyLock) { + this.qryIdGen = qryIdGen; this.busyLock = busyLock; } @@ -493,11 +497,13 @@ public class GridReduceQueryExecutor { } } - final long qryReqId = reqIdGen.incrementAndGet(); + final long qryReqId = qryIdGen.incrementAndGet(); final String space = cctx.name(); - final QueryRun r = new QueryRun(h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize()); + final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), space, + h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize(), + U.currentTimeMillis(), cancel); AffinityTopologyVersion topVer = h2.readyTopologyVersion(); @@ -1303,10 +1309,46 @@ public class GridReduceQueryExecutor { } /** + * Collect queries that already running more than specified duration. + * + * @param duration Duration to check. + * @return Collection of IDs and statements of long running queries. + */ + public Collection longRunningQueries(long duration) { + Collection res = new ArrayList<>(); + + long curTime = U.currentTimeMillis(); + + for (QueryRun run : runs.values()) { + if (run.qry.longQuery(curTime, duration)) + res.add(run.qry); + } + + return res; + } + + /** + * Cancel specified queries. + * + * @param queries Queries IDs to cancel. + */ + public void cancelQueries(Collection queries) { + for (Long qryId : queries) { + QueryRun run = runs.get(qryId); + + if (run != null) + run.qry.cancel(); + } + } + + /** * Query run. */ private static class QueryRun { /** */ + private final GridRunningQueryInfo qry; + + /** */ private final List idxs; /** */ @@ -1322,11 +1364,17 @@ public class GridReduceQueryExecutor { private final AtomicReference state = new AtomicReference<>(); /** + * @param id Query ID. + * @param qry Query text. + * @param cache Cache where query was executed. * @param conn Connection. * @param idxsCnt Number of indexes. * @param pageSize Page size. + * @param startTime Start time. + * @param cancel Query cancel handler. */ - private QueryRun(Connection conn, int idxsCnt, int pageSize) { + private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) { + this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false); this.conn = (JdbcConnection)conn; this.idxs = new ArrayList<>(idxsCnt); this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE; @@ -1384,4 +1432,4 @@ public class GridReduceQueryExecutor { return copy(msg, n, partsMap); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java index e47e893..66e7e4a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java @@ -17,15 +17,23 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Collection; +import java.util.Collections; import java.util.List; import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.Query; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -54,6 +62,7 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest { cc.setCopyOnRead(true); cc.setIndexedTypes(Integer.class, Value.class); + cc.setSqlFunctionClasses(TestSQLFunctions.class); cfg.setCacheConfiguration(cc); @@ -72,7 +81,7 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest { IgniteCache cache = grid(0).cache(null); for (int i = 0; i < KEYS; i++) - cache.put(i, new Value("before")); + cache.put(i, new Value(i, "before-" + i)); } /** {@inheritDoc} */ @@ -195,17 +204,148 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest { check(cache); } - /** */ - private static class Value { - /** */ - private String str; + /** + * Run specified query in separate thread. + * + * @param qry Query to execute. + */ + private IgniteInternalFuture runQueryAsync(final Query qry) throws Exception { + return multithreadedAsync(new Runnable() { + @Override public void run() { + try { + log.info(">>> Query started"); + + grid(0).cache(null).query(qry).getAll(); + + log.info(">>> Query finished"); + } + catch (Throwable e) { + e.printStackTrace(); + } + } + }, 1, "run-query"); + } - /** - * @param str String. - */ - public Value(String str) { - this.str = str; + /** + * Test collecting info about running. + * + * @throws Exception If failed. + */ + public void testRunningSqlFieldsQuery() throws Exception { + IgniteInternalFuture fut = runQueryAsync(new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3")); + + Thread.sleep(500); + + GridQueryProcessor qryProc = grid(0).context().query(); + + Collection queries = qryProc.runningQueries(0); + + assertEquals(1, queries.size()); + + fut.get(); + + queries = qryProc.runningQueries(0); + + assertEquals(0, queries.size()); + + SqlFieldsQuery qry = new SqlFieldsQuery("select _val, sleep(1000) from Value limit 3"); + qry.setLocal(true); + + fut = runQueryAsync(qry); + + Thread.sleep(500); + + queries = qryProc.runningQueries(0); + + assertEquals(1, queries.size()); + + fut.get(); + + queries = qryProc.runningQueries(0); + + assertEquals(0, queries.size()); + } + + /** + * Test collecting info about running. + * + * @throws Exception If failed. + */ + public void testRunningSqlQuery() throws Exception { + IgniteInternalFuture fut = runQueryAsync(new SqlQuery(Value.class, "id > sleep(100)")); + + Thread.sleep(500); + + GridQueryProcessor qryProc = grid(0).context().query(); + + Collection queries = qryProc.runningQueries(0); + + assertEquals(1, queries.size()); + + fut.get(); + + queries = qryProc.runningQueries(0); + + assertEquals(0, queries.size()); + + SqlQuery qry = new SqlQuery<>(Value.class, "id > sleep(100)"); + qry.setLocal(true); + + fut = runQueryAsync(qry); + + Thread.sleep(500); + + queries = qryProc.runningQueries(0); + + assertEquals(1, queries.size()); + + fut.get(); + + queries = qryProc.runningQueries(0); + + assertEquals(0, queries.size()); + } + + /** + * Test collecting info about running. + * + * @throws Exception If failed. + */ + public void testCancelingSqlFieldsQuery() throws Exception { + runQueryAsync(new SqlFieldsQuery("select * from (select _val, sleep(100) from Value limit 50)")); + + Thread.sleep(500); + + final GridQueryProcessor qryProc = grid(0).context().query(); + + Collection queries = qryProc.runningQueries(0); + + assertEquals(1, queries.size()); + + final Collection finalQueries = queries; + + for (GridRunningQueryInfo query : finalQueries) + qryProc.cancelQueries(Collections.singleton(query.id())); + + int n = 100; + + // Give cluster some time to cancel query and cleanup resources. + while (n > 0) { + Thread.sleep(100); + + queries = qryProc.runningQueries(0); + + if (queries.isEmpty()) + break; + + log.info(">>>> Wait for cancel: " + n); + + n--; } + + queries = qryProc.runningQueries(0); + + assertEquals(0, queries.size()); } /** @@ -218,9 +358,53 @@ public class CacheSqlQueryValueCopySelfTest extends GridCommonAbstractTest { for (Cache.Entry entry : cache) { cnt++; - assertEquals("before", entry.getValue().str); + assertEquals("before-" + entry.getKey(), entry.getValue().str); } assertEquals(KEYS, cnt); } -} \ No newline at end of file + + /** */ + private static class Value { + /** */ + @QuerySqlField + private int id; + + /** */ + @QuerySqlField + private String str; + + /** + * @param id ID. + * @param str String. + */ + public Value(int id, String str) { + this.id = id; + this.str = str; + } + } + + /** + * Utility class with custom SQL functions. + */ + public static class TestSQLFunctions { + /** + * Sleep function to simulate long running queries. + * + * @param x Time to sleep. + * @return Return specified argument. + */ + @QuerySqlFunction + public static long sleep(long x) { + if (x >= 0) + try { + Thread.sleep(x); + } + catch (InterruptedException ignored) { + // No-op. + } + + return x; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index 1f10593..01fefa3 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -477,4 +477,4 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { return storeId; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/49237343/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java index 12da5f7..52084c7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java @@ -106,6 +106,9 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract spi.registerCache(null, cacheCfg("B")); } + /** + * @param name Name. + */ private CacheConfiguration cacheCfg(String name) { CacheConfiguration cfg = new CacheConfiguration<>(); @@ -114,6 +117,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract return cfg; } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { idx.stop(); @@ -182,6 +186,9 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract return idx; } + /** + * @return {@code true} if OFF-HEAP mode should be tested. + */ protected boolean offheap() { return false; }