Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 09F68200BCB for ; Wed, 9 Nov 2016 09:38:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 08AF4160AFD; Wed, 9 Nov 2016 08:38:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ECAC8160B36 for ; Wed, 9 Nov 2016 09:38:17 +0100 (CET) Received: (qmail 24913 invoked by uid 500); 9 Nov 2016 08:38:17 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 24096 invoked by uid 99); 9 Nov 2016 08:38:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Nov 2016 08:38:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A6748EF9A0; Wed, 9 Nov 2016 08:38:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Wed, 09 Nov 2016 08:38:57 -0000 Message-Id: <8863551af558465b879fe0b7a229db96@git.apache.org> In-Reply-To: <1dfc6b9b86bf4c25ba1c9c3614011467@git.apache.org> References: <1dfc6b9b86bf4c25ba1c9c3614011467@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [43/50] [abbrv] ignite git commit: Fixes after merge. archived-at: Wed, 09 Nov 2016 08:38:22 -0000 Fixes after merge. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8b9abe8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8b9abe8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8b9abe8 Branch: refs/heads/master Commit: b8b9abe863ed8139553a9ad7013dfad5a363b4da Parents: 5fac786 Author: devozerov Authored: Mon Oct 31 21:31:22 2016 +0300 Committer: thatcoach Committed: Mon Oct 31 21:59:09 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/cache/query/SqlQuery.java | 5 +- .../processors/cache/QueryCursorImpl.java | 18 +- .../closure/GridClosureProcessor.java | 1 - .../processors/query/GridQueryCancel.java | 60 ++-- .../twostep/messages/GridQueryFailResponse.java | 13 +- .../junits/GridTestKernalContext.java | 14 +- .../query/h2/twostep/GridMergeIndex.java | 12 +- .../h2/twostep/msg/GridH2QueryRequest.java | 42 ++- ...butedQueryStopOnCancelOrTimeoutSelfTest.java | 57 ++-- ...cheQueryAbstractDistributedJoinSelfTest.java | 290 ++++++++++++++++++- ...QueryNodeRestartDistributedJoinSelfTest.java | 262 +---------------- .../IgniteCacheQueryNodeRestartSelfTest2.java | 2 +- ...nCancelOrTimeoutDistributedJoinSelfTest.java | 137 ++++++++- .../IgniteCacheQuerySelfTestSuite.java | 4 +- .../IgniteCacheQuerySelfTestSuite2.java | 2 + 15 files changed, 564 insertions(+), 355 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java index 83e171d..3b8fe6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java @@ -17,14 +17,15 @@ package org.apache.ignite.cache.query; -import java.util.concurrent.TimeUnit; -import javax.cache.Cache; import org.apache.ignite.IgniteCache; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; +import javax.cache.Cache; +import java.util.concurrent.TimeUnit; + /** * SQL Query. * http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java index f68426e..f93a747 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java @@ -17,11 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.QueryCancelledException; @@ -29,10 +24,13 @@ import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; -import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.CLOSED; -import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.EXECUTION; -import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.IDLE; -import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.RESULT_READY; +import javax.cache.CacheException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.*; /** * Query cursor implementation. @@ -40,7 +38,7 @@ import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State. public class QueryCursorImpl implements QueryCursorEx { /** */ private final static AtomicReferenceFieldUpdater STATE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(QueryCursorImpl.class, State.class, "state"); + AtomicReferenceFieldUpdater.newUpdater(QueryCursorImpl.class, State.class, "state"); /** Query executor. */ private Iterable iterExec; http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 252540e..9d295d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java index 47f1208..7391f39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java @@ -18,49 +18,57 @@ package org.apache.ignite.internal.processors.query; import org.apache.ignite.cache.query.QueryCancelledException; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.util.typedef.internal.U; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * Holds query cancel state. */ public class GridQueryCancel { - /** */ - private volatile boolean cancelled; + /** No-op runnable indicating cancelled state. */ + private static final Runnable CANCELLED = new Runnable() { + @Override public void run() { + // No-op. + } + }; /** */ - private volatile boolean completed; + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(GridQueryCancel.class, Runnable.class, "clo"); /** */ private volatile Runnable clo; /** - * Sets a cancel closure. The closure must be idempotent to multiple invocations. + * Sets a cancel closure. * * @param clo Clo. */ - public void set(Runnable clo) throws QueryCancelledException{ - checkCancelled(); + public void set(Runnable clo) throws QueryCancelledException { + assert clo != null; - this.clo = clo; + while(true) { + Runnable tmp = this.clo; + + if (tmp == CANCELLED) + throw new QueryCancelledException(); + + if (STATE_UPDATER.compareAndSet(this, tmp, clo)) + return; + } } /** - * Spins until a query is completed. - * Only one thread can enter this method. - * This is guaranteed by {@link org.apache.ignite.internal.processors.cache.QueryCursorImpl} + * Executes cancel closure. */ public void cancel() { - cancelled = true; - - int attempt = 0; + while(true) { + Runnable tmp = this.clo; - while (!completed) { - if (clo != null) clo.run(); + if (STATE_UPDATER.compareAndSet(this, tmp, CANCELLED)) { + if (tmp != null) + tmp.run(); - try { - U.sleep(++attempt * 10); - } catch (IgniteInterruptedCheckedException ignored) { return; } } @@ -69,16 +77,8 @@ public class GridQueryCancel { /** * Stops query execution if a user requested cancel. */ - public void checkCancelled() throws QueryCancelledException{ - if (cancelled) + public void checkCancelled() throws QueryCancelledException { + if (clo == CANCELLED) throw new QueryCancelledException(); } - - /** - * Sets completed state. - * The method must be called then a query is completed by any reason, typically in final block. - */ - public void setCompleted() { - completed = true; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java index 261241e..7554ae9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java @@ -17,13 +17,14 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; -import java.nio.ByteBuffer; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import java.nio.ByteBuffer; + /** * Error message. */ @@ -113,13 +114,13 @@ public class GridQueryFailResponse implements Message { writer.incrementState(); case 1: - if (!writer.writeByte("failCode", failCode)) + if (!writer.writeLong("qryReqId", qryReqId)) return false; writer.incrementState(); case 2: - if (!writer.writeLong("qryReqId", qryReqId)) + if (!writer.writeByte("failCode", failCode)) return false; writer.incrementState(); @@ -146,7 +147,7 @@ public class GridQueryFailResponse implements Message { reader.incrementState(); case 1: - failCode = reader.readByte("failCode"); + qryReqId = reader.readLong("qryReqId"); if (!reader.isLastRead()) return false; @@ -154,7 +155,7 @@ public class GridQueryFailResponse implements Message { reader.incrementState(); case 2: - qryReqId = reader.readLong("qryReqId"); + failCode = reader.readByte("failCode"); if (!reader.isLastRead()) return false; @@ -175,4 +176,4 @@ public class GridQueryFailResponse implements Message { @Override public byte fieldsCount() { return 3; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index cba67e0..03138c3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -17,21 +17,18 @@ package org.apache.ignite.testframework.junits; -import java.util.List; -import java.util.ListIterator; -import java.util.concurrent.ExecutorService; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.GridComponent; -import org.apache.ignite.internal.GridKernalContextImpl; -import org.apache.ignite.internal.GridKernalGatewayImpl; -import org.apache.ignite.internal.GridLoggerProxy; -import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; +import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.ExecutorService; + /** * Test context. */ @@ -62,6 +59,7 @@ public class GridTestKernalContext extends GridKernalContextImpl { null, null, null, + null, U.allPluginProviders()); GridTestUtils.setFieldValue(grid(), "cfg", config()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index 796ea66..444ea82 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import java.util.ArrayList; import java.util.Collection; import java.util.ConcurrentModificationException; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -113,7 +112,7 @@ public abstract class GridMergeIndex extends BaseIndex { protected final void checkSourceNodesAlive() { for (UUID nodeId : sources()) { if (!ctx.discovery().alive(nodeId)) { - fail(nodeId); + fail(nodeId, null); return; } @@ -174,11 +173,18 @@ public abstract class GridMergeIndex extends BaseIndex { /** * @param nodeId Node ID. */ - public void fail(UUID nodeId) { + public void fail(UUID nodeId, final CacheException e) { addPage0(new GridResultPage(null, nodeId, null) { @Override public boolean isFail() { return true; } + + @Override public void fetchNextPage() { + if (e == null) + super.fetchNextPage(); + else + throw e; + } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java index dc82b2c..884173f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java @@ -45,8 +45,8 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { private static final long serialVersionUID = 0L; /** - * Map query will not destroy context until explicit query cancel request - * will be received because distributed join requests can be received. + * Map query will not destroy context until explicit query cancel request will be received because distributed join + * requests can be received. */ public static int FLAG_DISTRIBUTED_JOINS = 1; @@ -82,6 +82,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { @GridDirectCollection(String.class) private Collection tbls; + /** */ + private int timeout; + /** * @param tbls Tables. * @return {@code this}. @@ -153,7 +156,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** * @return Explicit partitions mapping. */ - public Map partitions() { + public Map partitions() { return parts; } @@ -161,7 +164,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { * @param parts Explicit partitions mapping. * @return {@code this}. */ - public GridH2QueryRequest partitions(Map parts) { + public GridH2QueryRequest partitions(Map parts) { this.parts = parts; return this; @@ -219,6 +222,23 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { return (this.flags & flags) == flags; } + /** + * @return Timeout. + */ + public int timeout() { + return timeout; + } + + /** + * @param timeout New timeout. + * @return {@code this}. + */ + public GridH2QueryRequest timeout(int timeout) { + this.timeout = timeout; + + return this; + } + /** {@inheritDoc} */ @Override public void marshall(Marshaller m) { if (F.isEmpty(qrys)) @@ -297,6 +317,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { writer.incrementState(); + case 8: + if (!writer.writeInt("timeout", timeout)) + return false; + + writer.incrementState(); } return true; @@ -374,6 +399,13 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { reader.incrementState(); + case 8: + timeout = reader.readInt("timeout"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return reader.afterMessageRead(GridH2QueryRequest.class); @@ -386,7 +418,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 8; + return 9; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java index 0f60db2..a92bf2b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -43,7 +44,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; */ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends GridCommonAbstractTest { /** Grids count. */ - private static final int GRIDS_COUNT = 3; + private static final int GRIDS_CNT = 3; /** IP finder. */ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); @@ -55,19 +56,19 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr public static final int VAL_SIZE = 16; /** */ - private static final String QUERY_1 = "select a._val, b._val from String a, String b"; + private static final String QRY_1 = "select a._val, b._val from String a, String b"; /** */ - private static final String QUERY_2 = "select a._key, count(*) from String a group by a._key"; + private static final String QRY_2 = "select a._key, count(*) from String a group by a._key"; /** */ - private static final String QUERY_3 = "select a._val from String a"; + private static final String QRY_3 = "select a._val from String a"; /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - startGridsMultiThreaded(GRIDS_COUNT); + startGridsMultiThreaded(GRIDS_CNT); } /** {@inheritDoc} */ @@ -97,82 +98,82 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr /** */ public void testRemoteQueryExecutionTimeout() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 500, TimeUnit.MILLISECONDS, true); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true); } /** */ public void testRemoteQueryWithMergeTableTimeout() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_2, 500, TimeUnit.MILLISECONDS, true); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true); } /** */ public void testRemoteQueryExecutionCancel0() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 1, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryExecutionCancel1() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 500, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryExecutionCancel2() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 1, TimeUnit.SECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false); } /** */ public void testRemoteQueryExecutionCancel3() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 3, TimeUnit.SECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false); } /** */ public void testRemoteQueryWithMergeTableCancel0() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_2, 1, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithMergeTableCancel1() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_2, 500, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithMergeTableCancel2() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_2, 1_500, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithMergeTableCancel3() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_2, 3, TimeUnit.SECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false); } /** */ public void testRemoteQueryWithoutMergeTableCancel0() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_3, 1, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithoutMergeTableCancel1() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_3, 500, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithoutMergeTableCancel2() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_3, 1_000, TimeUnit.MILLISECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false); } /** */ public void testRemoteQueryWithoutMergeTableCancel3() throws Exception { - testQuery(CACHE_SIZE, VAL_SIZE, QUERY_3, 3, TimeUnit.SECONDS, false); + testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false); } /** */ public void testRemoteQueryAlreadyFinishedStop() throws Exception { - testQuery(100, VAL_SIZE, QUERY_3, 3, TimeUnit.SECONDS, false); + testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false); } /** */ - private void testQuery(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit, - boolean timeout) throws Exception { + private void testQueryCancel(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit, + boolean timeout) throws Exception { try (Ignite client = startGrid("client")) { IgniteCache cache = client.cache(null); @@ -230,19 +231,23 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr /** * Validates clean state on all participating nodes after query cancellation. */ + @SuppressWarnings("unchecked") private void checkCleanState() { - for (int i = 0; i < GRIDS_COUNT; i++) { + for (int i = 0; i < GRIDS_CNT; i++) { IgniteEx grid = grid(i); // Validate everything was cleaned up. - ConcurrentMap> map = U.field(((IgniteH2Indexing)U.field((Object)U.field( + ConcurrentMap map = U.field(((IgniteH2Indexing)U.field(U.field( grid.context(), "qryProc"), "idx")).mapQueryExecutor(), "qryRess"); String msg = "Map executor state is not cleared"; // TODO FIXME Current implementation leaves map entry for each node that's ever executed a query. - for (ConcurrentMap results : map.values()) - assertEquals(msg, 0, results.size()); + for (Object result : map.values()) { + Map m = U.field(result, "res"); + + assertEquals(msg, 0, m.size()); + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java index be34a09..339e0d3 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java @@ -1,7 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.processors.cache.distributed.near; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.GridRandom; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.io.Serializable; +import java.util.Random; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + /** - * Created by vozerov on 31.10.2016. + * Test for distributed queries with node restarts. */ -public class IgniteCacheQueryAbstractDistributedJoinSelfTest { -} +public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends GridCommonAbstractTest { + /** */ + protected static final String QRY_0 = "select co._key, count(*) cnt\n" + + "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" + + "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" + + "group by co._key order by cnt desc, co._key"; + + /** */ + protected static final String QRY_0_BROADCAST = "select co._key, count(*) cnt\n" + + "from \"co\".Company co, \"pr\".Product pr, \"pu\".Purchase pu, \"pe\".Person pe \n" + + "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" + + "group by co._key order by cnt desc, co._key"; + + /** */ + protected static final String QRY_1 = "select pr._key, co._key\n" + + "from \"pr\".Product pr, \"co\".Company co\n" + + "where pr.companyId = co._key\n" + + "order by co._key, pr._key "; + + /** */ + protected static final String QRY_1_BROADCAST = "select pr._key, co._key\n" + + "from \"co\".Company co, \"pr\".Product pr \n" + + "where pr.companyId = co._key\n" + + "order by co._key, pr._key "; + + /** */ + protected static final int GRID_CNT = 2; + + /** */ + private static final int PERS_CNT = 600; + + /** */ + private static final int PURCHASE_CNT = 6_000; + + /** */ + private static final int COMPANY_CNT = 25; + + /** */ + private static final int PRODUCT_CNT = 100; + + /** */ + private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + if ("client".equals(gridName)) + c.setClientMode(true); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + c.setDiscoverySpi(disco); + + int i = 0; + + CacheConfiguration[] ccs = new CacheConfiguration[4]; + + for (String name : F.asList("pe", "pu", "co", "pr")) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setName(name); + cc.setCacheMode(PARTITIONED); + cc.setBackups(2); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setRebalanceMode(SYNC); + cc.setLongQueryWarningTimeout(15_000); + cc.setAffinity(new RendezvousAffinityFunction(false, 60)); + + switch (name) { + case "pe": + cc.setIndexedTypes( + Integer.class, Person.class + ); + + break; + + case "pu": + cc.setIndexedTypes( + Integer.class, Purchase.class + ); + + break; + + case "co": + cc.setIndexedTypes( + Integer.class, Company.class + ); + + break; + + case "pr": + cc.setIndexedTypes( + Integer.class, Product.class + ); + + break; + } + + ccs[i++] = cc; + } + + c.setCacheConfiguration(ccs); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(GRID_CNT); + + fillCaches(); + } + + /** + * + */ + private void fillCaches() { + IgniteCache co = grid(0).cache("co"); + + for (int i = 0; i < COMPANY_CNT; i++) + co.put(i, new Company(i)); + + IgniteCache pr = grid(0).cache("pr"); + + Random rnd = new GridRandom(); + + for (int i = 0; i < PRODUCT_CNT; i++) + pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT))); + + IgniteCache pe = grid(0).cache("pe"); + + for (int i = 0; i < PERS_CNT; i++) + pe.put(i, new Person(i)); + + IgniteCache pu = grid(0).cache("pu"); + + for (int i = 0; i < PURCHASE_CNT; i++) { + int persId = rnd.nextInt(PERS_CNT); + int prodId = rnd.nextInt(PRODUCT_CNT); + + pu.put(i, new Purchase(persId, prodId)); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * + */ + protected static class Person implements Serializable { + /** */ + @QuerySqlField(index = true) + int id; + + /** + * @param id ID. + */ + Person(int id) { + this.id = id; + } + } + + /** + * + */ + protected static class Purchase implements Serializable { + /** */ + @QuerySqlField(index = true) + int personId; + + /** */ + @QuerySqlField(index = true) + int productId; + + /** + * @param personId Person ID. + * @param productId Product ID. + */ + Purchase(int personId, int productId) { + this.personId = personId; + this.productId = productId; + } + } + + /** + * + */ + protected static class Company implements Serializable { + /** */ + @QuerySqlField(index = true) + int id; + + /** + * @param id ID. + */ + Company(int id) { + this.id = id; + } + } + + /** + * + */ + protected static class Product implements Serializable { + /** */ + @QuerySqlField(index = true) + int id; + + /** */ + @QuerySqlField(index = true) + int companyId; + + /** + * @param id ID. + * @param companyId Company ID. + */ + Product(int id, int companyId) { + this.id = id; + this.companyId = companyId; + } + } + + /** */ + public static class Functions { + /** */ + @QuerySqlFunction + public static int sleep() { + try { + U.sleep(1_000); + } catch (IgniteInterruptedCheckedException ignored) { + // No-op. + } + + return 0; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java index 0e6806f..ced28bc 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java @@ -17,185 +17,25 @@ package org.apache.ignite.internal.processors.cache.distributed.near; -import java.io.Serializable; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicIntegerArray; -import javax.cache.CacheException; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.cache.query.annotations.QuerySqlField; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.GridRandom; import org.apache.ignite.internal.util.typedef.CAX; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import javax.cache.CacheException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; /** * Test for distributed queries with node restarts. */ -public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridCommonAbstractTest { - /** */ - private static final String QRY_0 = "select co._key, count(*) cnt\n" + - "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" + - "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" + - "group by co._key order by cnt desc, co._key"; - - /** */ - private static final String QRY_0_BROADCAST = "select co._key, count(*) cnt\n" + - "from \"co\".Company co, \"pr\".Product pr, \"pu\".Purchase pu, \"pe\".Person pe \n" + - "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" + - "group by co._key order by cnt desc, co._key"; - - /** */ - private static final String QRY_1 = "select pr._key, co._key\n" + - "from \"pr\".Product pr, \"co\".Company co\n" + - "where pr.companyId = co._key\n" + - "order by co._key, pr._key "; - - /** */ - private static final String QRY_1_BROADCAST = "select pr._key, co._key\n" + - "from \"co\".Company co, \"pr\".Product pr \n" + - "where pr.companyId = co._key\n" + - "order by co._key, pr._key "; - - /** */ - private static final int GRID_CNT = 6; - - /** */ - private static final int PERS_CNT = 600; - - /** */ - private static final int PURCHASE_CNT = 6000; - - /** */ - private static final int COMPANY_CNT = 25; - - /** */ - private static final int PRODUCT_CNT = 100; - - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - c.setDiscoverySpi(disco); - - int i = 0; - - CacheConfiguration[] ccs = new CacheConfiguration[4]; - - for (String name : F.asList("pe", "pu", "co", "pr")) { - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setName(name); - cc.setCacheMode(PARTITIONED); - cc.setBackups(2); - cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setAtomicityMode(TRANSACTIONAL); - cc.setRebalanceMode(SYNC); - cc.setLongQueryWarningTimeout(15_000); - cc.setAffinity(new RendezvousAffinityFunction(false, 60)); - - switch (name) { - case "pe": - cc.setIndexedTypes( - Integer.class, Person.class - ); - - break; - - case "pu": - cc.setIndexedTypes( - Integer.class, Purchase.class - ); - - break; - - case "co": - cc.setIndexedTypes( - Integer.class, Company.class - ); - - break; - - case "pr": - cc.setIndexedTypes( - Integer.class, Product.class - ); - - break; - } - - ccs[i++] = cc; - } - - c.setCacheConfiguration(ccs); - - return c; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGridsMultiThreaded(GRID_CNT); - - fillCaches(); - } - - /** - * - */ - private void fillCaches() { - IgniteCache co = grid(0).cache("co"); - - for (int i = 0; i < COMPANY_CNT; i++) - co.put(i, new Company(i)); - - IgniteCache pr = grid(0).cache("pr"); - - Random rnd = new GridRandom(); - - for (int i = 0; i < PRODUCT_CNT; i++) - pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT))); - - IgniteCache pe = grid(0).cache("pe"); - - for (int i = 0; i < PERS_CNT; i++) - pe.put(i, new Person(i)); - - IgniteCache pu = grid(0).cache("pu"); - - for (int i = 0; i < PURCHASE_CNT; i++) { - int persId = rnd.nextInt(PERS_CNT); - int prodId = rnd.nextInt(PRODUCT_CNT); - - pu.put(i, new Purchase(persId, prodId)); - } - } +public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCacheQueryAbstractDistributedJoinSelfTest { /** * @throws Exception If failed. */ @@ -319,13 +159,6 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm else { IgniteCache cache = grid(g).cache("co"); - SqlFieldsQuery qry; - - if (broadcastQry) - qry = new SqlFieldsQuery(QRY_1_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true); - else - qry = new SqlFieldsQuery(QRY_1).setDistributedJoins(true); - assertEquals(rRes, cache.query(qry1).getAll()); } @@ -392,85 +225,4 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm info("Stopped."); } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** - * - */ - private static class Person implements Serializable { - /** */ - @QuerySqlField(index = true) - int id; - - /** - * @param id ID. - */ - Person(int id) { - this.id = id; - } - } - - /** - * - */ - private static class Purchase implements Serializable { - /** */ - @QuerySqlField(index = true) - int personId; - - /** */ - @QuerySqlField(index = true) - int productId; - - /** - * @param personId Person ID. - * @param productId Product ID. - */ - Purchase(int personId, int productId) { - this.personId = personId; - this.productId = productId; - } - } - - /** - * - */ - private static class Company implements Serializable { - /** */ - @QuerySqlField(index = true) - int id; - - /** - * @param id ID. - */ - Company(int id) { - this.id = id; - } - } - - /** - * - */ - private static class Product implements Serializable { - /** */ - @QuerySqlField(index = true) - int id; - - /** */ - @QuerySqlField(index = true) - int companyId; - - /** - * @param id ID. - * @param companyId Company ID. - */ - Product(int id, int companyId) { - this.id = id; - this.companyId = companyId; - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java index 8b33a46..154daa0 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java @@ -267,7 +267,7 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest continue; if (th.getMessage() != null && - th.getMessage().startsWith("Failed to fetch data from node:")) { + th.getMessage().startsWith("Failed to fetch data from node:")) { failedOnRemoteFetch = true; break; http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java index 80bd62e..4baaf8f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java @@ -1,7 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.ignite.internal.processors.cache.distributed.near; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.QueryCancelledException; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.util.typedef.internal.U; + /** - * Created by vozerov on 31.10.2016. + * Test for cancel of query containing distributed joins. */ -public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest { -} +public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extends IgniteCacheQueryAbstractDistributedJoinSelfTest { + /** */ + public void testCancel1() throws Exception { + testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, false); + } + + /** */ + public void testCancel2() throws Exception { + testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, false); + } + + /** */ + public void testCancel3() throws Exception { + testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, false); + } + + /** */ + public void testCancel4() throws Exception { + testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, false); + } + + /** */ + public void testTimeout1() throws Exception { + testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, true); + } + + /** */ + public void testTimeout2() throws Exception { + testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, true); + } + + /** */ + public void testTimeout3() throws Exception { + testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, true); + } + + /** */ + public void testTimeout4() throws Exception { + testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, true); + } + + /** */ + private void testQueryCancel(Ignite ignite, String cacheName, String sql, int timeoutUnits, TimeUnit timeUnit, + boolean timeout) throws Exception { + SqlFieldsQuery qry = new SqlFieldsQuery(sql).setDistributedJoins(true); + + IgniteCache cache = ignite.cache(cacheName); + + final QueryCursor> cursor; + if (timeout) { + qry.setTimeout(timeoutUnits, timeUnit); + + cursor = cache.query(qry); + } else { + cursor = cache.query(qry); + + ignite.scheduler().runLocal(new Runnable() { + @Override public void run() { + cursor.close(); + } + }, timeoutUnits, timeUnit); + } + + try (QueryCursor> ignored = cursor) { + cursor.iterator(); + } + catch (CacheException ex) { + log().error("Got expected exception", ex); + + assertTrue("Must throw correct exception", ex.getCause() instanceof QueryCancelledException); + } + + // Give some time to clean up. + Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 3_000); + + checkCleanState(); + } + + /** + * Validates clean state on all participating nodes after query cancellation. + */ + @SuppressWarnings("unchecked") + private void checkCleanState() { + for (int i = 0; i < GRID_CNT; i++) { + IgniteEx grid = grid(i); + + // Validate everything was cleaned up. + ConcurrentMap map = U.field(((IgniteH2Indexing) U.field(U.field( + grid.context(), "qryProc"), "idx")).mapQueryExecutor(), "qryRess"); + + String msg = "Map executor state is not cleared"; + + // TODO FIXME Current implementation leaves map entry for each node that's ever executed a query. + for (Object result : map.values()) { + Map m = U.field(result, "res"); + + assertEquals(msg, 0, m.size()); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index b7e6403..7f98d0a 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -63,7 +63,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheP import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNoRebalanceSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeFailTest; -import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartDistributedJoinSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryAbstractDistributedJoinSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest2; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQueryP2PDisabledSelfTest; @@ -125,7 +125,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheOffheapIndexScanTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); - suite.addTestSuite(IgniteCacheQueryNodeRestartDistributedJoinSelfTest.class); + suite.addTestSuite(IgniteCacheQueryAbstractDistributedJoinSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeFailTest.class); suite.addTestSuite(IgniteCacheClientQueryReplicatedNodeRestartSelfTest.class); suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java index 5722c01..be7523f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheD import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQueryP2PEnabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest; @@ -100,6 +101,7 @@ public class IgniteCacheQuerySelfTestSuite2 extends TestSuite { suite.addTestSuite(IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.class); suite.addTestSuite(IgniteCacheDistributedQueryCancelSelfTest.class); suite.addTestSuite(IgniteCacheLocalQueryCancelOrTimeoutSelfTest.class); + suite.addTestSuite(IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.class); // Other. suite.addTestSuite(CacheQueryNewClientSelfTest.class);