ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [43/50] [abbrv] ignite git commit: Fixes after merge.
Date Wed, 09 Nov 2016 08:38:57 GMT
Fixes after merge.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8b9abe8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8b9abe8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8b9abe8

Branch: refs/heads/master
Commit: b8b9abe863ed8139553a9ad7013dfad5a363b4da
Parents: 5fac786
Author: devozerov <vozerov@gridgain.com>
Authored: Mon Oct 31 21:31:22 2016 +0300
Committer: thatcoach <ppozerov@list.ru>
Committed: Mon Oct 31 21:59:09 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/cache/query/SqlQuery.java |   5 +-
 .../processors/cache/QueryCursorImpl.java       |  18 +-
 .../closure/GridClosureProcessor.java           |   1 -
 .../processors/query/GridQueryCancel.java       |  60 ++--
 .../twostep/messages/GridQueryFailResponse.java |  13 +-
 .../junits/GridTestKernalContext.java           |  14 +-
 .../query/h2/twostep/GridMergeIndex.java        |  12 +-
 .../h2/twostep/msg/GridH2QueryRequest.java      |  42 ++-
 ...butedQueryStopOnCancelOrTimeoutSelfTest.java |  57 ++--
 ...cheQueryAbstractDistributedJoinSelfTest.java | 290 ++++++++++++++++++-
 ...QueryNodeRestartDistributedJoinSelfTest.java | 262 +----------------
 .../IgniteCacheQueryNodeRestartSelfTest2.java   |   2 +-
 ...nCancelOrTimeoutDistributedJoinSelfTest.java | 137 ++++++++-
 .../IgniteCacheQuerySelfTestSuite.java          |   4 +-
 .../IgniteCacheQuerySelfTestSuite2.java         |   2 +
 15 files changed, 564 insertions(+), 355 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index 83e171d..3b8fe6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@ -17,14 +17,15 @@
 
 package org.apache.ignite.cache.query;
 
-import java.util.concurrent.TimeUnit;
-import javax.cache.Cache;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
+import javax.cache.Cache;
+import java.util.concurrent.TimeUnit;
+
 /**
  * SQL Query.
  *

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index f68426e..f93a747 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -17,11 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.QueryCancelledException;
@@ -29,10 +24,13 @@ import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 
-import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.CLOSED;
-import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.EXECUTION;
-import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.IDLE;
-import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.RESULT_READY;
+import javax.cache.CacheException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.*;
 
 /**
  * Query cursor implementation.
@@ -40,7 +38,7 @@ import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.
 public class QueryCursorImpl<T> implements QueryCursorEx<T> {
     /** */
     private final static AtomicReferenceFieldUpdater<QueryCursorImpl, State> STATE_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(QueryCursorImpl.class, State.class, "state");
+        AtomicReferenceFieldUpdater.newUpdater(QueryCursorImpl.class, State.class, "state");
 
     /** Query executor. */
     private Iterable<T> iterExec;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 252540e..9d295d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -27,7 +27,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
index 47f1208..7391f39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
@@ -18,49 +18,57 @@
 package org.apache.ignite.internal.processors.query;
 
 import org.apache.ignite.cache.query.QueryCancelledException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /**
  * Holds query cancel state.
  */
 public class GridQueryCancel {
-    /** */
-    private volatile boolean cancelled;
+    /** No-op runnable indicating cancelled state. */
+    private static final Runnable CANCELLED = new Runnable() {
+        @Override public void run() {
+            // No-op.
+        }
+    };
 
     /** */
-    private volatile boolean completed;
+    private static final AtomicReferenceFieldUpdater<GridQueryCancel, Runnable> STATE_UPDATER =
+        AtomicReferenceFieldUpdater.newUpdater(GridQueryCancel.class, Runnable.class, "clo");
 
     /** */
     private volatile Runnable clo;
 
     /**
-     * Sets a cancel closure. The closure must be idempotent to multiple invocations.
+     * Sets a cancel closure.
      *
      * @param clo Clo.
      */
-    public void set(Runnable clo) throws QueryCancelledException{
-        checkCancelled();
+    public void set(Runnable clo) throws QueryCancelledException {
+        assert clo != null;
 
-        this.clo = clo;
+        while(true) {
+            Runnable tmp = this.clo;
+
+            if (tmp == CANCELLED)
+                throw new QueryCancelledException();
+
+            if (STATE_UPDATER.compareAndSet(this, tmp, clo))
+                return;
+        }
     }
 
     /**
-     * Spins until a query is completed.
-     * Only one thread can enter this method.
-     * This is guaranteed by {@link org.apache.ignite.internal.processors.cache.QueryCursorImpl}
+     * Executes cancel closure.
      */
     public void cancel() {
-        cancelled = true;
-
-        int attempt = 0;
+        while(true) {
+            Runnable tmp = this.clo;
 
-        while (!completed) {
-            if (clo != null) clo.run();
+            if (STATE_UPDATER.compareAndSet(this, tmp, CANCELLED)) {
+                if (tmp != null)
+                    tmp.run();
 
-            try {
-                U.sleep(++attempt * 10);
-            } catch (IgniteInterruptedCheckedException ignored) {
                 return;
             }
         }
@@ -69,16 +77,8 @@ public class GridQueryCancel {
     /**
      * Stops query execution if a user requested cancel.
      */
-    public void checkCancelled() throws QueryCancelledException{
-        if (cancelled)
+    public void checkCancelled() throws QueryCancelledException {
+        if (clo == CANCELLED)
             throw new QueryCancelledException();
     }
-
-    /**
-     * Sets completed state.
-     * The method must be called then a query is completed by any reason, typically in final block.
-     */
-    public void setCompleted() {
-        completed = true;
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
index 261241e..7554ae9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
-import java.nio.ByteBuffer;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
+import java.nio.ByteBuffer;
+
 /**
  * Error message.
  */
@@ -113,13 +114,13 @@ public class GridQueryFailResponse implements Message {
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeByte("failCode", failCode))
+                if (!writer.writeLong("qryReqId", qryReqId))
                     return false;
 
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeLong("qryReqId", qryReqId))
+                if (!writer.writeByte("failCode", failCode))
                     return false;
 
                 writer.incrementState();
@@ -146,7 +147,7 @@ public class GridQueryFailResponse implements Message {
                 reader.incrementState();
 
             case 1:
-                failCode = reader.readByte("failCode");
+                qryReqId = reader.readLong("qryReqId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -154,7 +155,7 @@ public class GridQueryFailResponse implements Message {
                 reader.incrementState();
 
             case 2:
-                qryReqId = reader.readLong("qryReqId");
+                failCode = reader.readByte("failCode");
 
                 if (!reader.isLastRead())
                     return false;
@@ -175,4 +176,4 @@ public class GridQueryFailResponse implements Message {
     @Override public byte fieldsCount() {
         return 3;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index cba67e0..03138c3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -17,21 +17,18 @@
 
 package org.apache.ignite.testframework.junits;
 
-import java.util.List;
-import java.util.ListIterator;
-import java.util.concurrent.ExecutorService;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridComponent;
-import org.apache.ignite.internal.GridKernalContextImpl;
-import org.apache.ignite.internal.GridKernalGatewayImpl;
-import org.apache.ignite.internal.GridLoggerProxy;
-import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.ExecutorService;
+
 /**
  * Test context.
  */
@@ -62,6 +59,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
                 null,
                 null,
                 null,
+                null,
                 U.allPluginProviders());
 
         GridTestUtils.setFieldValue(grid(), "cfg", config());

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 796ea66..444ea82 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -113,7 +112,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     protected final void checkSourceNodesAlive() {
         for (UUID nodeId : sources()) {
             if (!ctx.discovery().alive(nodeId)) {
-                fail(nodeId);
+                fail(nodeId, null);
 
                 return;
             }
@@ -174,11 +173,18 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * @param nodeId Node ID.
      */
-    public void fail(UUID nodeId) {
+    public void fail(UUID nodeId, final CacheException e) {
         addPage0(new GridResultPage(null, nodeId, null) {
             @Override public boolean isFail() {
                 return true;
             }
+
+            @Override public void fetchNextPage() {
+                if (e == null)
+                    super.fetchNextPage();
+                else
+                    throw e;
+            }
         });
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index dc82b2c..884173f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -45,8 +45,8 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     private static final long serialVersionUID = 0L;
 
     /**
-     * Map query will not destroy context until explicit query cancel request
-     * will be received because distributed join requests can be received.
+     * Map query will not destroy context until explicit query cancel request will be received because distributed join
+     * requests can be received.
      */
     public static int FLAG_DISTRIBUTED_JOINS = 1;
 
@@ -82,6 +82,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     @GridDirectCollection(String.class)
     private Collection<String> tbls;
 
+    /** */
+    private int timeout;
+
     /**
      * @param tbls Tables.
      * @return {@code this}.
@@ -153,7 +156,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     /**
      * @return Explicit partitions mapping.
      */
-    public Map<UUID,int[]> partitions() {
+    public Map<UUID, int[]> partitions() {
         return parts;
     }
 
@@ -161,7 +164,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
      * @param parts Explicit partitions mapping.
      * @return {@code this}.
      */
-    public GridH2QueryRequest partitions(Map<UUID,int[]> parts) {
+    public GridH2QueryRequest partitions(Map<UUID, int[]> parts) {
         this.parts = parts;
 
         return this;
@@ -219,6 +222,23 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
         return (this.flags & flags) == flags;
     }
 
+    /**
+     * @return Timeout.
+     */
+    public int timeout() {
+        return timeout;
+    }
+
+    /**
+     * @param timeout New timeout.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest timeout(int timeout) {
+        this.timeout = timeout;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public void marshall(Marshaller m) {
         if (F.isEmpty(qrys))
@@ -297,6 +317,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
                 writer.incrementState();
 
+            case 8:
+                if (!writer.writeInt("timeout", timeout))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -374,6 +399,13 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
                 reader.incrementState();
 
+            case 8:
+                timeout = reader.readInt("timeout");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -386,7 +418,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 9;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
index 0f60db2..a92bf2b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -43,7 +44,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
  */
 public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends GridCommonAbstractTest {
     /** Grids count. */
-    private static final int GRIDS_COUNT = 3;
+    private static final int GRIDS_CNT = 3;
 
     /** IP finder. */
     private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
@@ -55,19 +56,19 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
     public static final int VAL_SIZE = 16;
 
     /** */
-    private static final String QUERY_1 = "select a._val, b._val from String a, String b";
+    private static final String QRY_1 = "select a._val, b._val from String a, String b";
 
     /** */
-    private static final String QUERY_2 = "select a._key, count(*) from String a group by a._key";
+    private static final String QRY_2 = "select a._key, count(*) from String a group by a._key";
 
     /** */
-    private static final String QUERY_3 = "select a._val from String a";
+    private static final String QRY_3 = "select a._val from String a";
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        startGridsMultiThreaded(GRIDS_COUNT);
+        startGridsMultiThreaded(GRIDS_CNT);
     }
 
     /** {@inheritDoc} */
@@ -97,82 +98,82 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
 
     /** */
     public void testRemoteQueryExecutionTimeout() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 500, TimeUnit.MILLISECONDS, true);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, true);
     }
 
     /** */
     public void testRemoteQueryWithMergeTableTimeout() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_2, 500, TimeUnit.MILLISECONDS, true);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, true);
     }
 
     /** */
     public void testRemoteQueryExecutionCancel0() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 1, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.MILLISECONDS, false);
     }
 
     /** */
     public void testRemoteQueryExecutionCancel1() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 500, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 500, TimeUnit.MILLISECONDS, false);
     }
 
     /** */
     public void testRemoteQueryExecutionCancel2() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 1, TimeUnit.SECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 1, TimeUnit.SECONDS, false);
     }
 
     /** */
     public void testRemoteQueryExecutionCancel3() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_1, 3, TimeUnit.SECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_1, 3, TimeUnit.SECONDS, false);
     }
 
     /** */
     public void testRemoteQueryWithMergeTableCancel0() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_2, 1, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1, TimeUnit.MILLISECONDS, false);
     }
 
     /** */
     public void testRemoteQueryWithMergeTableCancel1() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_2, 500, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 500, TimeUnit.MILLISECONDS, false);
     }
 
     /** */
     public void testRemoteQueryWithMergeTableCancel2() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_2, 1_500, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 1_500, TimeUnit.MILLISECONDS, false);
     }
 
     /** */
     public void testRemoteQueryWithMergeTableCancel3() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_2, 3, TimeUnit.SECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_2, 3, TimeUnit.SECONDS, false);
     }
 
     /** */
     public void testRemoteQueryWithoutMergeTableCancel0() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_3, 1, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1, TimeUnit.MILLISECONDS, false);
     }
 
     /** */
     public void testRemoteQueryWithoutMergeTableCancel1() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_3, 500, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 500, TimeUnit.MILLISECONDS, false);
     }
 
     /** */
     public void testRemoteQueryWithoutMergeTableCancel2() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_3, 1_000, TimeUnit.MILLISECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 1_000, TimeUnit.MILLISECONDS, false);
     }
 
     /** */
     public void testRemoteQueryWithoutMergeTableCancel3() throws Exception {
-        testQuery(CACHE_SIZE, VAL_SIZE, QUERY_3, 3, TimeUnit.SECONDS, false);
+        testQueryCancel(CACHE_SIZE, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false);
     }
 
     /** */
     public void testRemoteQueryAlreadyFinishedStop() throws Exception {
-        testQuery(100, VAL_SIZE, QUERY_3, 3, TimeUnit.SECONDS, false);
+        testQueryCancel(100, VAL_SIZE, QRY_3, 3, TimeUnit.SECONDS, false);
     }
 
     /** */
-    private void testQuery(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit,
-        boolean timeout) throws Exception {
+    private void testQueryCancel(int keyCnt, int valSize, String sql, int timeoutUnits, TimeUnit timeUnit,
+                                 boolean timeout) throws Exception {
         try (Ignite client = startGrid("client")) {
 
             IgniteCache<Object, Object> cache = client.cache(null);
@@ -230,19 +231,23 @@ public class IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest extends Gr
     /**
      * Validates clean state on all participating nodes after query cancellation.
      */
+    @SuppressWarnings("unchecked")
     private void checkCleanState() {
-        for (int i = 0; i < GRIDS_COUNT; i++) {
+        for (int i = 0; i < GRIDS_CNT; i++) {
             IgniteEx grid = grid(i);
 
             // Validate everything was cleaned up.
-            ConcurrentMap<UUID, ConcurrentMap<Long, ?>> map = U.field(((IgniteH2Indexing)U.field((Object)U.field(
+            ConcurrentMap<UUID, ?> map = U.field(((IgniteH2Indexing)U.field(U.field(
                 grid.context(), "qryProc"), "idx")).mapQueryExecutor(), "qryRess");
 
             String msg = "Map executor state is not cleared";
 
             // TODO FIXME Current implementation leaves map entry for each node that's ever executed a query.
-            for (ConcurrentMap<Long, ?> results : map.values())
-                assertEquals(msg, 0, results.size());
+            for (Object result : map.values()) {
+                Map<Long, ?> m = U.field(result, "res");
+
+                assertEquals(msg, 0, m.size());
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
index be34a09..339e0d3 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
@@ -1,7 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.Serializable;
+import java.util.Random;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
 /**
- * Created by vozerov on 31.10.2016.
+ * Test for distributed queries with node restarts.
  */
-public class IgniteCacheQueryAbstractDistributedJoinSelfTest {
-}
+public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends GridCommonAbstractTest {
+    /** */
+    protected static final String QRY_0 = "select co._key, count(*) cnt\n" +
+        "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
+        "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
+        "group by co._key order by cnt desc, co._key";
+
+    /** */
+    protected static final String QRY_0_BROADCAST = "select co._key, count(*) cnt\n" +
+        "from \"co\".Company co, \"pr\".Product pr, \"pu\".Purchase pu, \"pe\".Person pe \n" +
+        "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
+        "group by co._key order by cnt desc, co._key";
+
+    /** */
+    protected static final String QRY_1 = "select pr._key, co._key\n" +
+        "from \"pr\".Product pr, \"co\".Company co\n" +
+        "where pr.companyId = co._key\n" +
+        "order by co._key, pr._key ";
+
+    /** */
+    protected static final String QRY_1_BROADCAST = "select pr._key, co._key\n" +
+        "from \"co\".Company co, \"pr\".Product pr \n" +
+        "where pr.companyId = co._key\n" +
+        "order by co._key, pr._key ";
+
+    /** */
+    protected static final int GRID_CNT = 2;
+
+    /** */
+    private static final int PERS_CNT = 600;
+
+    /** */
+    private static final int PURCHASE_CNT = 6_000;
+
+    /** */
+    private static final int COMPANY_CNT = 25;
+
+    /** */
+    private static final int PRODUCT_CNT = 100;
+
+    /** */
+    private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        if ("client".equals(gridName))
+            c.setClientMode(true);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        c.setDiscoverySpi(disco);
+
+        int i = 0;
+
+        CacheConfiguration<?, ?>[] ccs = new CacheConfiguration[4];
+
+        for (String name : F.asList("pe", "pu", "co", "pr")) {
+            CacheConfiguration<?, ?> cc = defaultCacheConfiguration();
+
+            cc.setName(name);
+            cc.setCacheMode(PARTITIONED);
+            cc.setBackups(2);
+            cc.setWriteSynchronizationMode(FULL_SYNC);
+            cc.setAtomicityMode(TRANSACTIONAL);
+            cc.setRebalanceMode(SYNC);
+            cc.setLongQueryWarningTimeout(15_000);
+            cc.setAffinity(new RendezvousAffinityFunction(false, 60));
+
+            switch (name) {
+                case "pe":
+                    cc.setIndexedTypes(
+                            Integer.class, Person.class
+                    );
+
+                    break;
+
+                case "pu":
+                    cc.setIndexedTypes(
+                            Integer.class, Purchase.class
+                    );
+
+                    break;
+
+                case "co":
+                    cc.setIndexedTypes(
+                            Integer.class, Company.class
+                    );
+
+                    break;
+
+                case "pr":
+                    cc.setIndexedTypes(
+                            Integer.class, Product.class
+                    );
+
+                    break;
+            }
+
+            ccs[i++] = cc;
+        }
+
+        c.setCacheConfiguration(ccs);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(GRID_CNT);
+
+        fillCaches();
+    }
+
+    /**
+     *
+     */
+    private void fillCaches() {
+        IgniteCache<Integer, Company> co = grid(0).cache("co");
+
+        for (int i = 0; i < COMPANY_CNT; i++)
+            co.put(i, new Company(i));
+
+        IgniteCache<Integer, Product> pr = grid(0).cache("pr");
+
+        Random rnd = new GridRandom();
+
+        for (int i = 0; i < PRODUCT_CNT; i++)
+            pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT)));
+
+        IgniteCache<Integer, Person> pe = grid(0).cache("pe");
+
+        for (int i = 0; i < PERS_CNT; i++)
+            pe.put(i, new Person(i));
+
+        IgniteCache<Integer, Purchase> pu = grid(0).cache("pu");
+
+        for (int i = 0; i < PURCHASE_CNT; i++) {
+            int persId = rnd.nextInt(PERS_CNT);
+            int prodId = rnd.nextInt(PRODUCT_CNT);
+
+            pu.put(i, new Purchase(persId, prodId));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     *
+     */
+    protected static class Person implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        int id;
+
+        /**
+         * @param id ID.
+         */
+        Person(int id) {
+            this.id = id;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class Purchase implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        int personId;
+
+        /** */
+        @QuerySqlField(index = true)
+        int productId;
+
+        /**
+         * @param personId Person ID.
+         * @param productId Product ID.
+         */
+        Purchase(int personId, int productId) {
+            this.personId = personId;
+            this.productId = productId;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class Company implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        int id;
+
+        /**
+         * @param id ID.
+         */
+        Company(int id) {
+            this.id = id;
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class Product implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        int id;
+
+        /** */
+        @QuerySqlField(index = true)
+        int companyId;
+
+        /**
+         * @param id ID.
+         * @param companyId Company ID.
+         */
+        Product(int id, int companyId) {
+            this.id = id;
+            this.companyId = companyId;
+        }
+    }
+
+    /** */
+    public static class Functions {
+        /** */
+        @QuerySqlFunction
+        public static int sleep() {
+            try {
+                U.sleep(1_000);
+            } catch (IgniteInterruptedCheckedException ignored) {
+                // No-op.
+            }
+
+            return 0;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
index 0e6806f..ced28bc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -17,185 +17,25 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
-import java.io.Serializable;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicIntegerArray;
-import javax.cache.CacheException;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.cache.query.annotations.QuerySqlField;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.typedef.CAX;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
-import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import javax.cache.CacheException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
 
 /**
  * Test for distributed queries with node restarts.
  */
-public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridCommonAbstractTest {
-    /** */
-    private static final String QRY_0 = "select co._key, count(*) cnt\n" +
-        "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
-        "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
-        "group by co._key order by cnt desc, co._key";
-
-    /** */
-    private static final String QRY_0_BROADCAST = "select co._key, count(*) cnt\n" +
-        "from \"co\".Company co, \"pr\".Product pr, \"pu\".Purchase pu, \"pe\".Person pe \n" +
-        "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
-        "group by co._key order by cnt desc, co._key";
-
-    /** */
-    private static final String QRY_1 = "select pr._key, co._key\n" +
-        "from \"pr\".Product pr, \"co\".Company co\n" +
-        "where pr.companyId = co._key\n" +
-        "order by co._key, pr._key ";
-
-    /** */
-    private static final String QRY_1_BROADCAST = "select pr._key, co._key\n" +
-        "from \"co\".Company co, \"pr\".Product pr \n" +
-        "where pr.companyId = co._key\n" +
-        "order by co._key, pr._key ";
-
-    /** */
-    private static final int GRID_CNT = 6;
-
-    /** */
-    private static final int PERS_CNT = 600;
-
-    /** */
-    private static final int PURCHASE_CNT = 6000;
-
-    /** */
-    private static final int COMPANY_CNT = 25;
-
-    /** */
-    private static final int PRODUCT_CNT = 100;
-
-    /** */
-    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration c = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(ipFinder);
-
-        c.setDiscoverySpi(disco);
-
-        int i = 0;
-
-        CacheConfiguration<?, ?>[] ccs = new CacheConfiguration[4];
-
-        for (String name : F.asList("pe", "pu", "co", "pr")) {
-            CacheConfiguration<?, ?> cc = defaultCacheConfiguration();
-
-            cc.setName(name);
-            cc.setCacheMode(PARTITIONED);
-            cc.setBackups(2);
-            cc.setWriteSynchronizationMode(FULL_SYNC);
-            cc.setAtomicityMode(TRANSACTIONAL);
-            cc.setRebalanceMode(SYNC);
-            cc.setLongQueryWarningTimeout(15_000);
-            cc.setAffinity(new RendezvousAffinityFunction(false, 60));
-
-            switch (name) {
-                case "pe":
-                    cc.setIndexedTypes(
-                        Integer.class, Person.class
-                    );
-
-                    break;
-
-                case "pu":
-                    cc.setIndexedTypes(
-                        Integer.class, Purchase.class
-                    );
-
-                    break;
-
-                case "co":
-                    cc.setIndexedTypes(
-                        Integer.class, Company.class
-                    );
-
-                    break;
-
-                case "pr":
-                    cc.setIndexedTypes(
-                        Integer.class, Product.class
-                    );
-
-                    break;
-            }
-
-            ccs[i++] = cc;
-        }
-
-        c.setCacheConfiguration(ccs);
-
-        return c;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGridsMultiThreaded(GRID_CNT);
-
-        fillCaches();
-    }
-
-    /**
-     *
-     */
-    private void fillCaches() {
-        IgniteCache<Integer, Company> co = grid(0).cache("co");
-
-        for (int i = 0; i < COMPANY_CNT; i++)
-            co.put(i, new Company(i));
-
-        IgniteCache<Integer, Product> pr = grid(0).cache("pr");
-
-        Random rnd = new GridRandom();
-
-        for (int i = 0; i < PRODUCT_CNT; i++)
-            pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT)));
-
-        IgniteCache<Integer, Person> pe = grid(0).cache("pe");
-
-        for (int i = 0; i < PERS_CNT; i++)
-            pe.put(i, new Person(i));
-
-        IgniteCache<Integer, Purchase> pu = grid(0).cache("pu");
-
-        for (int i = 0; i < PURCHASE_CNT; i++) {
-            int persId = rnd.nextInt(PERS_CNT);
-            int prodId = rnd.nextInt(PRODUCT_CNT);
-
-            pu.put(i, new Purchase(persId, prodId));
-        }
-    }
+public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends IgniteCacheQueryAbstractDistributedJoinSelfTest {
     /**
      * @throws Exception If failed.
      */
@@ -319,13 +159,6 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
                     else {
                         IgniteCache<?, ?> cache = grid(g).cache("co");
 
-                        SqlFieldsQuery qry;
-
-                        if (broadcastQry)
-                            qry = new SqlFieldsQuery(QRY_1_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true);
-                        else
-                            qry = new SqlFieldsQuery(QRY_1).setDistributedJoins(true);
-
                         assertEquals(rRes, cache.query(qry1).getAll());
                     }
 
@@ -392,85 +225,4 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
 
         info("Stopped.");
     }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
-    /**
-     *
-     */
-    private static class Person implements Serializable {
-        /** */
-        @QuerySqlField(index = true)
-        int id;
-
-        /**
-         * @param id ID.
-         */
-        Person(int id) {
-            this.id = id;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class Purchase implements Serializable {
-        /** */
-        @QuerySqlField(index = true)
-        int personId;
-
-        /** */
-        @QuerySqlField(index = true)
-        int productId;
-
-        /**
-         * @param personId Person ID.
-         * @param productId Product ID.
-         */
-        Purchase(int personId, int productId) {
-            this.personId = personId;
-            this.productId = productId;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class Company implements Serializable {
-        /** */
-        @QuerySqlField(index = true)
-        int id;
-
-        /**
-         * @param id ID.
-         */
-        Company(int id) {
-            this.id = id;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class Product implements Serializable {
-        /** */
-        @QuerySqlField(index = true)
-        int id;
-
-        /** */
-        @QuerySqlField(index = true)
-        int companyId;
-
-        /**
-         * @param id ID.
-         * @param companyId Company ID.
-         */
-        Product(int id, int companyId) {
-            this.id = id;
-            this.companyId = companyId;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
index 8b33a46..154daa0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
@@ -267,7 +267,7 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest
                                         continue;
 
                                     if (th.getMessage() != null &&
-                                            th.getMessage().startsWith("Failed to fetch data from node:")) {
+                                        th.getMessage().startsWith("Failed to fetch data from node:")) {
                                         failedOnRemoteFetch = true;
 
                                         break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
index 80bd62e..4baaf8f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
@@ -1,7 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
 /**
- * Created by vozerov on 31.10.2016.
+ * Test for cancel of query containing distributed joins.
  */
-public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest {
-}
+public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest extends IgniteCacheQueryAbstractDistributedJoinSelfTest {
+    /** */
+    public void testCancel1() throws Exception {
+        testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, false);
+    }
+
+    /** */
+    public void testCancel2() throws Exception {
+        testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, false);
+    }
+
+    /** */
+    public void testCancel3() throws Exception {
+        testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, false);
+    }
+
+    /** */
+    public void testCancel4() throws Exception {
+        testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, false);
+    }
+
+    /** */
+    public void testTimeout1() throws Exception {
+        testQueryCancel(grid(0), "pe", QRY_0, 1, TimeUnit.MILLISECONDS, true);
+    }
+
+    /** */
+    public void testTimeout2() throws Exception {
+        testQueryCancel(grid(0), "pe", QRY_0, 50, TimeUnit.MILLISECONDS, true);
+    }
+
+    /** */
+    public void testTimeout3() throws Exception {
+        testQueryCancel(grid(0), "pe", QRY_0, 100, TimeUnit.MILLISECONDS, true);
+    }
+
+    /** */
+    public void testTimeout4() throws Exception {
+        testQueryCancel(grid(0), "pe", QRY_0, 500, TimeUnit.MILLISECONDS, true);
+    }
+
+    /** */
+    private void testQueryCancel(Ignite ignite, String cacheName, String sql, int timeoutUnits, TimeUnit timeUnit,
+                           boolean timeout) throws Exception {
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql).setDistributedJoins(true);
+
+        IgniteCache<Object, Object> cache = ignite.cache(cacheName);
+
+        final QueryCursor<List<?>> cursor;
+        if (timeout) {
+            qry.setTimeout(timeoutUnits, timeUnit);
+
+            cursor = cache.query(qry);
+        } else {
+            cursor = cache.query(qry);
+
+            ignite.scheduler().runLocal(new Runnable() {
+                @Override public void run() {
+                    cursor.close();
+                }
+            }, timeoutUnits, timeUnit);
+        }
+
+        try (QueryCursor<List<?>> ignored = cursor) {
+            cursor.iterator();
+        }
+        catch (CacheException ex) {
+            log().error("Got expected exception", ex);
+
+            assertTrue("Must throw correct exception", ex.getCause() instanceof QueryCancelledException);
+        }
+
+        // Give some time to clean up.
+        Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 3_000);
+
+        checkCleanState();
+    }
+
+    /**
+     * Validates clean state on all participating nodes after query cancellation.
+     */
+    @SuppressWarnings("unchecked")
+    private void checkCleanState() {
+        for (int i = 0; i < GRID_CNT; i++) {
+            IgniteEx grid = grid(i);
+
+            // Validate everything was cleaned up.
+            ConcurrentMap<UUID, ?> map = U.field(((IgniteH2Indexing) U.field(U.field(
+                    grid.context(), "qryProc"), "idx")).mapQueryExecutor(), "qryRess");
+
+            String msg = "Map executor state is not cleared";
+
+            // TODO FIXME Current implementation leaves map entry for each node that's ever executed a query.
+            for (Object result : map.values()) {
+                Map<Long, ?> m = U.field(result, "res");
+
+                assertEquals(msg, 0, m.size());
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index b7e6403..7f98d0a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -63,7 +63,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheP
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedSnapshotEnabledQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNoRebalanceSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeFailTest;
-import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartDistributedJoinSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryAbstractDistributedJoinSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest2;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQueryP2PDisabledSelfTest;
@@ -125,7 +125,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheOffheapIndexScanTest.class);
         suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);
         suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class);
-        suite.addTestSuite(IgniteCacheQueryNodeRestartDistributedJoinSelfTest.class);
+        suite.addTestSuite(IgniteCacheQueryAbstractDistributedJoinSelfTest.class);
         suite.addTestSuite(IgniteCacheQueryNodeFailTest.class);
         suite.addTestSuite(IgniteCacheClientQueryReplicatedNodeRestartSelfTest.class);
         suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8b9abe8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
index 5722c01..be7523f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheD
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQueryP2PEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCachePartitionedFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
@@ -100,6 +101,7 @@ public class IgniteCacheQuerySelfTestSuite2 extends TestSuite {
         suite.addTestSuite(IgniteCacheDistributedQueryStopOnCancelOrTimeoutSelfTest.class);
         suite.addTestSuite(IgniteCacheDistributedQueryCancelSelfTest.class);
         suite.addTestSuite(IgniteCacheLocalQueryCancelOrTimeoutSelfTest.class);
+        suite.addTestSuite(IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.class);
 
         // Other.
         suite.addTestSuite(CacheQueryNewClientSelfTest.class);


Mime
View raw message