ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [02/22] ignite git commit: GG-11360 - Implement SQL queries cancellation (#18)
Date Mon, 31 Oct 2016 13:48:03 GMT
GG-11360 - Implement SQL queries cancellation (#18)

* GG-11360 Merged IGNITE-2680 to ignite-1.6.3.

* GG-11360 Test cleanup.

* GG-11360 Fixing broken tests.

* GG-11360 Fixing test.

* GG-11360 Fixing test.

* GG-11360 Fixing broken tests.

* GG-11360 Added test for forever-running query cancellation on node restart.

* GG-11360 Fixing race.

* GG-11360 Added test for forever-running query cancellation on node stop.

* GG-11360 Cleanup.

* GG-11360 Support for local query cancellation/timeout.

* GG-11360 Increase test duration.

* GG-11360 Remove redundant catch block.

* GG-11360 Fix formatting.

* GG-11360 Fix formatting.

* GG-11360 Fix formatting.

* GG-11360 Fix formatting.

* GG-11360 Fix formatting.

* GG-11360 Fix formatting.

* GG-11360 Fix formatting.

* GG-11360 Fix formatting.

* GG-11360 Simplify test.

* GG-11360 Simplify test.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* GG-11360 Fixing issues.

* Merge remote-tracking branch 'remotes/gg/ignite-1.6.10' into ignite-gg-11360

Conflicts:
	modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java

* GG-11360 Review fixes.

* GG-11360 Review fixes.

* GG-11360 Review fixes.

* GG-11360 Review fixes.

* GG-11360 Review fixes.

* GG-11360 Review fixes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/80abd1b7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/80abd1b7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/80abd1b7

Branch: refs/heads/ignite-4154
Commit: 80abd1b72e4fc7b0b95212e7f53c700c0fe21156
Parents: 7adfbcf
Author: Aleksei Scherbakov <alexey.scherbakoff@gmail.com>
Authored: Fri Oct 14 19:33:07 2016 +0300
Committer: Sergi Vladykin <svladykin@users.noreply.github.com>
Committed: Fri Oct 14 19:33:07 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteScheduler.java |  13 +
 .../cache/query/QueryCancelledException.java    |  35 +++
 .../apache/ignite/cache/query/QueryCursor.java  |   8 +-
 .../ignite/cache/query/SqlFieldsQuery.java      |  26 ++
 .../org/apache/ignite/cache/query/SqlQuery.java |  25 ++
 .../ignite/internal/IgniteSchedulerImpl.java    |  18 ++
 .../processors/cache/QueryCursorImpl.java       |  92 +++++--
 .../processors/query/GridQueryCancel.java       |  84 +++++++
 .../processors/query/GridQueryFieldsResult.java |   3 +-
 .../query/GridQueryFieldsResultAdapter.java     |   3 +-
 .../processors/query/GridQueryIndexing.java     |  26 +-
 .../processors/query/GridQueryProcessor.java    | 115 +++++----
 .../twostep/messages/GridQueryFailResponse.java |  34 ++-
 .../h2/twostep/messages/GridQueryRequest.java   |  31 ++-
 .../processors/query/h2/IgniteH2Indexing.java   | 177 ++++++++++---
 .../query/h2/twostep/GridMapQueryExecutor.java  |  42 +++-
 .../h2/twostep/GridReduceQueryExecutor.java     |  99 ++++++--
 ...niteCacheDistributedQueryCancelSelfTest.java | 176 +++++++++++++
 ...butedQueryStopOnCancelOrTimeoutSelfTest.java | 248 +++++++++++++++++++
 .../IgniteCacheQueryNodeRestartSelfTest2.java   | 125 ++++++----
 ...eCacheLocalQueryCancelOrTimeoutSelfTest.java | 158 ++++++++++++
 .../h2/GridIndexingSpiAbstractSelfTest.java     |   9 +-
 .../IgniteCacheQuerySelfTestSuite2.java         |   8 +
 23 files changed, 1341 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/IgniteScheduler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteScheduler.java b/modules/core/src/main/java/org/apache/ignite/IgniteScheduler.java
index b0ea960..2e2553b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteScheduler.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteScheduler.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite;
 
+import java.io.Closeable;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgniteRunnable;
@@ -61,6 +63,17 @@ public interface IgniteScheduler {
     public IgniteFuture<?> runLocal(@Nullable Runnable r);
 
     /**
+     * Executes given closure after the delay.
+     * <p>
+     * Note that class {@link IgniteRunnable} implements {@link Runnable}
+     * @param r Runnable to execute.
+     * @param delay Initial delay.
+     * @param timeUnit Time granularity.
+     * @return java.io.Closeable which can be used to cancel execution.
+     */
+    public Closeable runLocal(@Nullable Runnable r, long delay, TimeUnit timeUnit);
+
+    /**
      * Executes given callable on internal system thread pool asynchronously.
      * <p>
      * Note that class {@link IgniteRunnable} implements {@link Runnable} and class {@link IgniteOutClosure}

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java
new file mode 100644
index 0000000..5f5ffdc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCancelledException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * The exception is thrown if a query was cancelled or timed out while executing.
+ */
+public class QueryCancelledException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Default constructor.
+     */
+    public QueryCancelledException() {
+        super("The query was cancelled while executing.");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCursor.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCursor.java
index 84b6862..1af47ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryCursor.java
@@ -22,8 +22,6 @@ import java.util.List;
 /**
  * Query result cursor. Implements {@link Iterable} only for convenience, e.g. {@link #iterator()}
  * can be obtained only once. Also if iteration is started then {@link #getAll()} method calls are prohibited.
- * <p>
- * Not thread safe and must be used from single thread only.
  */
 public interface QueryCursor<T> extends Iterable<T>, AutoCloseable {
     /**
@@ -39,7 +37,11 @@ public interface QueryCursor<T> extends Iterable<T>, AutoCloseable {
     public List<T> getAll();
 
     /**
-     * Closes all resources related to this cursor.
+     * Closes all resources related to this cursor. If the query execution is in progress
+     * (which is possible in case of invoking from another thread), a cancel will be attempted.
+     * Sequential calls to this method have no effect.
+     * <p>
+     * Note: don't forget to close query cursors. Not doing so may lead to various resource leaks.
      */
     @Override public void close();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index b2dd181..d1a5117 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.cache.query;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -56,6 +58,9 @@ public final class SqlFieldsQuery extends Query<List<?>> {
     /** Collocation flag. */
     private boolean collocated;
 
+    /** Query timeout in millis. */
+    private int timeout;
+
     /**
      * Constructs SQL fields query.
      *
@@ -121,6 +126,27 @@ public final class SqlFieldsQuery extends Query<List<?>> {
     }
 
     /**
+     * Gets the query execution timeout in milliseconds.
+     *
+     * @return Timeout value.
+     */
+    public int getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Sets the query execution timeout. Query will be automatically cancelled if the execution timeout is exceeded.
+     * @param timeout Timeout value. Zero value disables timeout.
+     * @param timeUnit Time unit.
+     * @return {@code this} For chaining.
+     */
+    public SqlFieldsQuery setTimeout(int timeout, TimeUnit timeUnit) {
+        this.timeout = GridQueryProcessor.validateTimeout(timeout, timeUnit);
+
+        return this;
+    }
+
+    /**
      * Checks if this query is collocated.
      *
      * @return {@code true} If the query is collocated.

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index be3b390..51c6cb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.cache.query;
 
+import java.util.concurrent.TimeUnit;
 import javax.cache.Cache;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -43,6 +44,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
     @GridToStringInclude
     private Object[] args;
 
+    /** Timeout in millis. */
+    private int timeout;
+
     /**
      * Constructs query for the given type name and SQL query.
      *
@@ -130,6 +134,27 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
         return this;
     }
 
+    /**
+     * Gets the query execution timeout in milliseconds.
+     *
+     * @return Timeout value.
+     */
+    public int getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Sets the query execution timeout. Query will be automatically cancelled if the execution timeout is exceeded.
+     * @param timeout Timeout value. Zero value disables timeout.
+     * @param timeUnit Time granularity.
+     * @return {@code this} For chaining.
+     */
+    public SqlQuery<K, V> setTimeout(int timeout, TimeUnit timeUnit) {
+        this.timeout = GridQueryProcessor.validateTimeout(timeout, timeUnit);
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public SqlQuery<K, V> setPageSize(int pageSize) {
         return (SqlQuery<K, V>)super.setPageSize(pageSize);

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
index ed8d1a4..623b1f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java
@@ -17,17 +17,20 @@
 
 package org.apache.ignite.internal;
 
+import java.io.Closeable;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.ObjectStreamException;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteScheduler;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.scheduler.SchedulerFuture;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * {@link IgniteScheduler} implementation.
@@ -68,6 +71,21 @@ public class IgniteSchedulerImpl implements IgniteScheduler, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public Closeable runLocal(@Nullable Runnable r, long delay, TimeUnit timeUnit) {
+        A.notNull(r, "r");
+        A.ensure(delay > 0, "Illegal delay");
+
+        guard();
+
+        try {
+            return ctx.timeout().schedule(r, timeUnit.toMillis(delay), -1);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public <R> IgniteFuture<R> callLocal(Callable<R> c) {
         A.notNull(c, "c");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 672a37d..f68426e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -20,15 +20,28 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 
+import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.CLOSED;
+import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.EXECUTION;
+import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.IDLE;
+import static org.apache.ignite.internal.processors.cache.QueryCursorImpl.State.RESULT_READY;
+
 /**
  * Query cursor implementation.
  */
 public class QueryCursorImpl<T> implements QueryCursorEx<T> {
+    /** */
+    private final static AtomicReferenceFieldUpdater<QueryCursorImpl, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(QueryCursorImpl.class, State.class, "state");
+
     /** Query executor. */
     private Iterable<T> iterExec;
 
@@ -36,29 +49,43 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
     private Iterator<T> iter;
 
     /** */
-    private boolean iterTaken;
+    private volatile State state = IDLE;
 
     /** */
     private List<GridQueryFieldMetadata> fieldsMeta;
 
+    /** */
+    private final GridQueryCancel cancel;
+
     /**
      * @param iterExec Query executor.
+     * @param cancel Cancellation closure.
      */
-    public QueryCursorImpl(Iterable<T> iterExec) {
+    public QueryCursorImpl(Iterable<T> iterExec, GridQueryCancel cancel) {
         this.iterExec = iterExec;
+        this.cancel = cancel;
+    }
+
+    /**
+     * @param iterExec Query executor.
+     */
+    public QueryCursorImpl(Iterable<T> iterExec) {
+        this(iterExec, null);
     }
 
     /** {@inheritDoc} */
     @Override public Iterator<T> iterator() {
-        if (iter == null && iterTaken)
-            throw new IgniteException("Cursor is closed.");
+        if (!STATE_UPDATER.compareAndSet(this, IDLE, EXECUTION))
+            throw new IgniteException("Iterator is already fetched or query was cancelled.");
 
-        if (iterTaken)
-            throw new IgniteException("Iterator is already taken from this cursor.");
+        iter = iterExec.iterator();
 
-        iterTaken = true;
+        if (!STATE_UPDATER.compareAndSet(this, EXECUTION, RESULT_READY)) {
+            // Handle race with cancel and make sure the iterator resources are freed correctly.
+            closeIter();
 
-        iter = iterExec.iterator();
+            throw new CacheException(new QueryCancelledException());
+        }
 
         assert iter != null;
 
@@ -93,18 +120,35 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
 
     /** {@inheritDoc} */
     @Override public void close() {
-        Iterator<T> i;
-
-        if ((i = iter) != null) {
-            iter = null;
-
-            if (i instanceof AutoCloseable) {
-                try {
-                    ((AutoCloseable)i).close();
-                }
-                catch (Exception e) {
-                    throw new IgniteException(e);
-                }
+        while(state != CLOSED) {
+            if (STATE_UPDATER.compareAndSet(this, RESULT_READY, CLOSED)) {
+                closeIter();
+
+                return;
+            }
+
+            if (STATE_UPDATER.compareAndSet(this, EXECUTION, CLOSED)) {
+                if (cancel != null)
+                    cancel.cancel();
+
+                return;
+            }
+
+            if (STATE_UPDATER.compareAndSet(this, IDLE, CLOSED))
+                return;
+        }
+    }
+
+    /**
+     * Closes iterator.
+     */
+    private void closeIter() {
+        if (iter instanceof AutoCloseable) {
+            try {
+                ((AutoCloseable)iter).close();
+            }
+            catch (Exception e) {
+                throw new IgniteException(e);
             }
         }
     }
@@ -122,4 +166,12 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
     @Override public List<GridQueryFieldMetadata> fieldsMeta() {
         return fieldsMeta;
     }
+
+    /** Query cursor state */
+    protected enum State {
+        /** Idle. */IDLE,
+        /** Executing. */EXECUTION,
+        /** Result ready. */RESULT_READY,
+        /** Closed. */CLOSED,
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
new file mode 100644
index 0000000..47f1208
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCancel.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Holds query cancel state.
+ */
+public class GridQueryCancel {
+    /** */
+    private volatile boolean cancelled;
+
+    /** */
+    private volatile boolean completed;
+
+    /** */
+    private volatile Runnable clo;
+
+    /**
+     * Sets a cancel closure. The closure must be idempotent to multiple invocations.
+     *
+     * @param clo Clo.
+     */
+    public void set(Runnable clo) throws QueryCancelledException{
+        checkCancelled();
+
+        this.clo = clo;
+    }
+
+    /**
+     * Spins until a query is completed.
+     * Only one thread can enter this method.
+     * This is guaranteed by {@link org.apache.ignite.internal.processors.cache.QueryCursorImpl}
+     */
+    public void cancel() {
+        cancelled = true;
+
+        int attempt = 0;
+
+        while (!completed) {
+            if (clo != null) clo.run();
+
+            try {
+                U.sleep(++attempt * 10);
+            } catch (IgniteInterruptedCheckedException ignored) {
+                return;
+            }
+        }
+    }
+
+    /**
+     * Stops query execution if a user requested cancel.
+     */
+    public void checkCancelled() throws QueryCancelledException{
+        if (cancelled)
+            throw new QueryCancelledException();
+    }
+
+    /**
+     * Sets completed state.
+     * The method must be called then a query is completed by any reason, typically in final block.
+     */
+    public void setCompleted() {
+        completed = true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResult.java
index 25c8caba..e32a687 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResult.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query;
 
 import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.spi.IgniteSpiCloseableIterator;
 
 /**
@@ -37,5 +38,5 @@ public interface GridQueryFieldsResult {
      *
      * @return Iterator over queried fields.
      */
-    IgniteSpiCloseableIterator<List<?>> iterator();
+    IgniteSpiCloseableIterator<List<?>> iterator() throws IgniteCheckedException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResultAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResultAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResultAdapter.java
index fc01d2d..7f1d175 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResultAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryFieldsResultAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query;
 
 import java.util.Collections;
 import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.jetbrains.annotations.Nullable;
 
@@ -51,7 +52,7 @@ public class GridQueryFieldsResultAdapter implements GridQueryFieldsResult {
     }
 
     /** {@inheritDoc} */
-    @Override public GridCloseableIterator<List<?>> iterator() {
+    @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException{
         return it;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 7697a12..b1b3c68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -29,7 +29,6 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -58,17 +57,6 @@ public interface GridQueryIndexing {
     public void stop() throws IgniteCheckedException;
 
     /**
-     * Runs two step query.
-     *
-     * @param cctx Cache context.
-     * @param qry Query.
-     * @param keepCacheObjects If {@code true}, cache objects representation will be preserved.
-     * @return Cursor.
-     */
-    public Iterable<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry,
-        boolean keepCacheObjects);
-
-    /**
      * Parses SQL query into two step query and executes it.
      *
      * @param cctx Cache context.
@@ -93,11 +81,14 @@ public interface GridQueryIndexing {
      * @param qry Query.
      * @param params Query parameters.
      * @param filters Space name and key filters.
+     * @param timeout Query timeout in milliseconds.
+     * @param cancel Query cancel.
      * @return Query result.
      * @throws IgniteCheckedException If failed.
      */
-    public GridQueryFieldsResult queryFields(@Nullable String spaceName, String qry,
-        Collection<Object> params, IndexingQueryFilter filters) throws IgniteCheckedException;
+    public GridQueryFieldsResult execute(@Nullable String spaceName, String qry,
+        Collection<Object> params, IndexingQueryFilter filters, int timeout, GridQueryCancel cancel)
+        throws IgniteCheckedException;
 
     /**
      * Executes regular query.
@@ -241,4 +232,9 @@ public interface GridQueryIndexing {
      * @param reconnectFut Reconnect future.
      */
     public void onDisconnected(IgniteFuture<?> reconnectFut);
-}
+
+    /**
+     * Cancels all executing queries.
+     */
+    public void cancelAllQueries();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 1605188..3d185c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -39,6 +39,7 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -71,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
-import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -434,6 +435,19 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     @Override public void onKernalStop(boolean cancel) {
         super.onKernalStop(cancel);
 
+        if (cancel && idx != null)
+            try {
+                while (!busyLock.tryBlock(500))
+                    idx.cancelAllQueries();
+
+                return;
+            }
+            catch (InterruptedException e) {
+                U.warn(log, "Interrupted while waiting for active queries cancellation.");
+
+                Thread.currentThread().interrupt();
+            }
+
         busyLock.block();
     }
 
@@ -763,37 +777,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param space Space name.
-     * @param qry Query.
-     * @return Cursor.
-     */
-    public Iterable<List<?>> queryTwoStep(String space, final GridCacheTwoStepQuery qry) {
-        checkxEnabled();
-
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to execute query (grid is stopping).");
-
-        try {
-            final GridCacheContext<Object, Object> cctx = ctx.cache().internalCache(space).context();
-
-            return executeQuery(cctx, new IgniteOutClosureX<Iterable<List<?>>>() {
-                @Override public Iterable<List<?>> applyx() throws IgniteCheckedException {
-                    return idx.queryTwoStep(
-                        cctx,
-                        qry,
-                        cctx.keepBinary());
-                }
-            }, false);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
      * @param cctx Cache context.
      * @param qry Query.
      * @return Cursor.
@@ -806,7 +789,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
         try {
             return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
-                @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
+                @Override public QueryCursor<List<?>> applyx() {
                     return idx.queryTwoStep(cctx, qry);
                 }
             }, true);
@@ -942,6 +925,24 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param timeout Timeout.
+     * @param timeUnit Time unit.
+     * @return Converted time.
+     */
+    public static int validateTimeout(int timeout, TimeUnit timeUnit) {
+        A.ensure(timeUnit != TimeUnit.MICROSECONDS && timeUnit != TimeUnit.NANOSECONDS,
+                "timeUnit minimal resolution is millisecond.");
+
+        A.ensure(timeout >= 0, "timeout value should be non-negative.");
+
+        long tmp = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+
+        A.ensure(timeout <= Integer.MAX_VALUE, "timeout value too large.");
+
+        return (int) tmp;
+    }
+
+    /**
      * Closeable iterator.
      */
     private interface ClIter<X> extends AutoCloseable, Iterator<X> {
@@ -962,20 +963,26 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
             return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
                 @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
-                    String space = cctx.name();
-                    String sql = qry.getSql();
-                    Object[] args = qry.getArgs();
-
-                    final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args),
-                        idx.backupFilter(null, requestTopVer.get(), null));
+                    final String space = cctx.name();
+                    final String sql = qry.getSql();
+                    final Object[] args = qry.getArgs();
+                    final GridQueryCancel cancel = new GridQueryCancel();
 
-                    sendQueryExecutedEvent(sql, args);
+                    final GridQueryFieldsResult res = idx.execute(space, sql, F.asList(args),
+                        idx.backupFilter(null, requestTopVer.get(), null), qry.getTimeout(), cancel);
 
                     QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
                         @Override public Iterator<List<?>> iterator() {
-                            return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
+                            try {
+                                sendQueryExecutedEvent(sql, args);
+
+                                return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
+                            }
+                            catch (IgniteCheckedException e) {
+                                throw new IgniteException(e);
+                            }
                         }
-                    });
+                    }, cancel);
 
                     cursor.fieldsMeta(res.metaData());
 
@@ -1076,14 +1083,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @return Type name.
      */
     public static String typeName(String clsName) {
-        int packageEnd = clsName.lastIndexOf('.');
+        int pkgEnd = clsName.lastIndexOf('.');
 
-        if (packageEnd >= 0 && packageEnd < clsName.length() - 1)
-            clsName = clsName.substring(packageEnd + 1);
+        if (pkgEnd >= 0 && pkgEnd < clsName.length() - 1)
+            clsName = clsName.substring(pkgEnd + 1);
 
-        if (clsName.endsWith("[]")) {
+        if (clsName.endsWith("[]"))
             clsName = clsName.substring(0, clsName.length() - 2) + "_array";
-        }
 
         int parentEnd = clsName.lastIndexOf('$');
 
@@ -1154,7 +1160,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
             return executeQuery(cctx, new IgniteOutClosureX<GridQueryFieldsResult>() {
                 @Override public GridQueryFieldsResult applyx() throws IgniteCheckedException {
-                    return idx.queryFields(space, clause, params, filters);
+                    return idx.execute(space, clause, params, filters, 0, null);
                 }
             }, false);
         }
@@ -1519,13 +1525,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
         for (Map.Entry<String, String> entry : qryEntity.getFields().entrySet()) {
             ClassProperty prop = buildClassProperty(
-                d.keyClass(),
-                d.valueClass(),
-                entry.getKey(),
-                U.classForName(entry.getValue(), Object.class),
-                aliases,
-                coCtx);
-
+                    d.keyClass(),
+                    d.valueClass(),
+                    entry.getKey(),
+                    U.classForName(entry.getValue(), Object.class),
+                    aliases,
+                    coCtx);
 
             d.addProperty(prop, false);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
index 499438d..0baf6ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
 import java.nio.ByteBuffer;
+import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -27,6 +28,12 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  * Error message.
  */
 public class GridQueryFailResponse implements Message {
+    /** General error failure type. */
+    public static final byte GENERAL_ERROR = 0;
+
+    /** Cancelled by originator failure type. */
+    public static final byte CANCELLED_BY_ORIGINATOR = 1;
+
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -36,6 +43,9 @@ public class GridQueryFailResponse implements Message {
     /** */
     private String errMsg;
 
+    /** */
+    private byte failCode;
+
     /**
      * Default constructor.
      */
@@ -50,6 +60,7 @@ public class GridQueryFailResponse implements Message {
     public GridQueryFailResponse(long qryReqId, Throwable err) {
         this.qryReqId = qryReqId;
         this.errMsg = err.getClass() + ":" + err.getMessage();
+        this.failCode = err instanceof QueryCancelledException ? CANCELLED_BY_ORIGINATOR : GENERAL_ERROR;
     }
 
     /**
@@ -66,6 +77,13 @@ public class GridQueryFailResponse implements Message {
         return errMsg;
     }
 
+    /**
+     * @return Fail code.
+     */
+    public byte failCode() {
+        return failCode;
+    }
+
     /** {@inheritDoc} */
     @Override public void onAckReceived() {
         // No-op.
@@ -100,6 +118,12 @@ public class GridQueryFailResponse implements Message {
 
                 writer.incrementState();
 
+            case 2:
+                if (!writer.writeByte("failCode", failCode))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -129,6 +153,14 @@ public class GridQueryFailResponse implements Message {
 
                 reader.incrementState();
 
+            case 2:
+                failCode = reader.readByte("failCode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridQueryFailResponse.class);
@@ -141,6 +173,6 @@ public class GridQueryFailResponse implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 2;
+        return 3;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 60d348b..550cf9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -66,6 +66,9 @@ public class GridQueryRequest implements Message {
     @GridToStringInclude
     private int[] parts;
 
+    /** */
+    private int timeout;
+
     /**
      * Default constructor.
      */
@@ -81,6 +84,7 @@ public class GridQueryRequest implements Message {
      * @param topVer Topology version.
      * @param extraSpaces All space names participating in query other than {@code space}.
      * @param parts Optional partitions for unstable topology.
+     * @param timeout Timeout in millis.
      */
     public GridQueryRequest(
         long reqId,
@@ -89,7 +93,8 @@ public class GridQueryRequest implements Message {
         Collection<GridCacheSqlQuery> qrys,
         AffinityTopologyVersion topVer,
         List<String> extraSpaces,
-        int[] parts) {
+        int[] parts,
+        int timeout) {
         this.reqId = reqId;
         this.pageSize = pageSize;
         this.space = space;
@@ -98,6 +103,7 @@ public class GridQueryRequest implements Message {
         this.topVer = topVer;
         this.extraSpaces = extraSpaces;
         this.parts = parts;
+        this.timeout = timeout;
     }
 
     /**
@@ -163,6 +169,13 @@ public class GridQueryRequest implements Message {
     }
 
     /**
+     * @return Timeout.
+     */
+    public int timeout() {
+        return this.timeout;
+    }
+
+    /**
      * @return Queries.
      */
     public Collection<GridCacheSqlQuery> queries() throws IgniteCheckedException {
@@ -233,6 +246,12 @@ public class GridQueryRequest implements Message {
 
                 writer.incrementState();
 
+            case 7:
+                if (!writer.writeInt("timeout", timeout))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -302,6 +321,14 @@ public class GridQueryRequest implements Message {
 
                 reader.incrementState();
 
+            case 7:
+                timeout = reader.readInt("timeout");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridQueryRequest.class);
@@ -314,6 +341,6 @@ public class GridQueryRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index a5f283e..ab332c1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -51,12 +51,14 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
@@ -75,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
@@ -120,9 +123,12 @@ import org.h2.api.JavaObjectSerializer;
 import org.h2.command.CommandInterface;
 import org.h2.constant.ErrorCode;
 import org.h2.constant.SysProperties;
+import org.h2.engine.Session;
 import org.h2.index.Index;
 import org.h2.index.SpatialIndex;
+import org.h2.jdbc.JdbcConnection;
 import org.h2.jdbc.JdbcPreparedStatement;
+import org.h2.jdbc.JdbcStatement;
 import org.h2.message.DbException;
 import org.h2.mvstore.cache.CacheLongKeyLIRS;
 import org.h2.server.web.WebServer;
@@ -350,7 +356,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             PreparedStatement stmt = cache.get(sql);
 
-            if (stmt != null && !stmt.isClosed()) {
+            if (stmt != null && !((JdbcStatement)stmt).wasCancelled()) {
                 assert stmt.getConnection() == c;
 
                 return stmt;
@@ -727,28 +733,33 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public GridQueryFieldsResult queryFields(@Nullable final String spaceName, final String qry,
-        @Nullable final Collection<Object> params, final IndexingQueryFilter filters)
+    @Override public GridQueryFieldsResult execute(@Nullable final String spaceName, final String qry,
+        @Nullable final Collection<Object> params, final IndexingQueryFilter filters,
+        final int timeout, final GridQueryCancel cancel)
         throws IgniteCheckedException {
         setFilters(filters);
 
         try {
-            Connection conn = connectionForThread(schema(spaceName));
+            final Connection conn = connectionForThread(schema(spaceName));
 
-            ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, qry, params, true);
+            final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true);
 
-            List<GridQueryFieldMetadata> meta = null;
+            List<GridQueryFieldMetadata> meta;
 
-            if (rs != null) {
-                try {
-                    meta = meta(rs.getMetaData());
-                }
-                catch (SQLException e) {
-                    throw new IgniteCheckedException("Failed to get meta data.", e);
-                }
+            try {
+                meta = meta(stmt.getMetaData());
+            }
+            catch (SQLException e) {
+                throw new IgniteCheckedException("Cannot prepare query metadata", e);
             }
 
-            return new GridQueryFieldsResultAdapter(meta, new FieldsIterator(rs));
+            return new GridQueryFieldsResultAdapter(meta, null) {
+                @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException{
+                    ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel);
+
+                    return new FieldsIterator(rs);
+                }
+            };
         }
         finally {
             setFilters(null);
@@ -808,18 +819,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Executes sql query.
+     * Prepares sql statement.
      *
-     * @param conn Connection,.
-     * @param sql Sql query.
-     * @param params Parameters.
-     * @param useStmtCache If {@code true} uses statement cache.
-     * @return Result.
+     * @param conn Connection.
+     * @param sql Sql.
+     * @param params Params.
+     * @param useStmtCache If {@code true} use stmt cache.
+     * @return Prepared statement with set parameters.
      * @throws IgniteCheckedException If failed.
      */
-    private ResultSet executeSqlQuery(Connection conn, String sql, Collection<Object> params, boolean useStmtCache)
-        throws IgniteCheckedException {
-        PreparedStatement stmt;
+    private PreparedStatement preparedStatementWithParams(Connection conn, String sql, Collection<Object> params,
+        boolean useStmtCache) throws IgniteCheckedException {
+        final PreparedStatement stmt;
 
         try {
             stmt = prepareStatement(conn, sql, useStmtCache);
@@ -840,12 +851,54 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         bindParameters(stmt, params);
 
+        return stmt;
+    }
+
+    /**
+     * Executes sql query statement.
+     *
+     * @param conn Connection,.
+     * @param stmt Statement.
+     * @param cancel Query cancel.
+     * @return Result.
+     * @throws IgniteCheckedException If failed.
+     */
+    private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
+        int timeoutMillis, @Nullable GridQueryCancel cancel)
+        throws IgniteCheckedException {
+
+        if (timeoutMillis > 0)
+            ((Session)((JdbcConnection)conn).getSession()).setQueryTimeout(timeoutMillis);
+
+        if (cancel != null) {
+            cancel.set(new Runnable() {
+                @Override public void run() {
+                    try {
+                        stmt.cancel();
+                    } catch (SQLException ignored) {
+                        // No-op.
+                    }
+                }
+            });
+        }
+
         try {
             return stmt.executeQuery();
         }
         catch (SQLException e) {
+            // Throw special exception.
+            if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+                throw new QueryCancelledException();
+
             throw new IgniteCheckedException("Failed to execute SQL query.", e);
         }
+        finally {
+            if(cancel != null)
+                cancel.setCompleted();
+
+            if (timeoutMillis > 0)
+                ((Session)((JdbcConnection)conn).getSession()).setQueryTimeout(0);
+        }
     }
 
     /**
@@ -855,7 +908,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param conn Connection,.
      * @param sql Sql query.
      * @param params Parameters.
-     * @param useStmtCache If {@code true} uses statement cache.
+     * @param useStmtCache If {@code true} uses stmt cache.
+     * @param cancel Query cancel.
      * @return Result.
      * @throws IgniteCheckedException If failed.
      */
@@ -863,11 +917,35 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         Connection conn,
         String sql,
         @Nullable Collection<Object> params,
-        boolean useStmtCache) throws IgniteCheckedException {
+        boolean useStmtCache,
+        int timeoutMillis,
+        @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
+        return executeSqlQueryWithTimer(space, preparedStatementWithParams(conn, sql, params, useStmtCache),
+            conn, sql, params, timeoutMillis, cancel);
+    }
+
+    /**
+     * Executes sql query and prints warning if query is too slow.
+     *
+     * @param space Space name.
+     * @param stmt Prepared statement for query.
+     * @param conn Connection.
+     * @param sql Sql query.
+     * @param params Parameters.
+     * @param cancel Query cancel.
+     * @return Result.
+     * @throws IgniteCheckedException If failed.
+     */
+    private ResultSet executeSqlQueryWithTimer(String space, PreparedStatement stmt,
+        Connection conn,
+        String sql,
+        @Nullable Collection<Object> params,
+        int timeoutMillis,
+        @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
         long start = U.currentTimeMillis();
 
         try {
-            ResultSet rs = executeSqlQuery(conn, sql, params, useStmtCache);
+            ResultSet rs = executeSqlQuery(conn, stmt, timeoutMillis, cancel);
 
             long time = U.currentTimeMillis() - start;
 
@@ -876,7 +954,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (time > longQryExecTimeout) {
                 String msg = "Query execution is too long (" + time + " ms): " + sql;
 
-                ResultSet plan = executeSqlQuery(conn, "EXPLAIN " + sql, params, false);
+                ResultSet plan = executeSqlQuery(conn, preparedStatementWithParams(conn, "EXPLAIN " + sql,
+                    params, false), 0, null);
 
                 plan.next();
 
@@ -903,16 +982,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param qry Query.
      * @param params Query parameters.
      * @param tbl Target table of query to generate select.
+     * @param cancel Query cancel.
      * @return Result set.
      * @throws IgniteCheckedException If failed.
      */
-    private ResultSet executeQuery(String space, String qry, @Nullable Collection<Object> params, TableDescriptor tbl)
+    private ResultSet executeQuery(String space, String qry, @Nullable Collection<Object> params, TableDescriptor tbl,
+        @Nullable GridQueryCancel cancel)
             throws IgniteCheckedException {
         Connection conn = connectionForThread(tbl.schemaName());
 
         String sql = generateQuery(qry, tbl);
 
-        return executeSqlQueryWithTimer(space, conn, sql, params, true);
+        return executeSqlQueryWithTimer(space,
+            preparedStatementWithParams(conn, sql, params, true),
+            conn, sql, params, 0, cancel);
     }
 
     /**
@@ -954,7 +1037,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         setFilters(filters);
 
         try {
-            ResultSet rs = executeQuery(spaceName, qry, params, tbl);
+            ResultSet rs = executeQuery(spaceName, qry, params, tbl, null);
 
             return new KeyValIterator(rs);
         }
@@ -964,11 +1047,19 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public Iterable<List<?>> queryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry,
-        final boolean keepCacheObj) {
+    private Iterable<List<?>> doQueryTwoStep(final GridCacheContext<?, ?> cctx, final GridCacheTwoStepQuery qry,
+        final boolean keepCacheObj,
+        final int timeoutMillis,
+        final GridQueryCancel cancel) {
         return new Iterable<List<?>>() {
             @Override public Iterator<List<?>> iterator() {
-                return rdcQryExec.query(cctx, qry, keepCacheObj);
+                try {
+                    return rdcQryExec.query(cctx, qry, keepCacheObj, timeoutMillis, cancel);
+                }
+                finally {
+                    if (cancel != null)
+                        cancel.setCompleted();
+                }
             }
         };
     }
@@ -998,6 +1089,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         fqry.setArgs(qry.getArgs());
         fqry.setPageSize(qry.getPageSize());
 
+        if(qry.getTimeout() > 0)
+            fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
+
         final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry);
 
         final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() {
@@ -1064,7 +1158,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         try {
                             ctx.cache().createMissingCaches();
                         }
-                        catch (IgniteCheckedException e1) {
+                        catch (IgniteCheckedException ignored) {
                             throw new CacheException("Failed to create missing caches.", e);
                         }
 
@@ -1104,7 +1198,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         twoStepQry.pageSize(qry.getPageSize());
 
-        QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(queryTwoStep(cctx, twoStepQry, cctx.keepBinary()));
+        GridQueryCancel cancel = new GridQueryCancel();
+
+        QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
+            doQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), qry.getTimeout(), cancel), cancel);
 
         cursor.fieldsMeta(meta);
 
@@ -1455,8 +1552,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (tbl == null)
             return -1;
 
-        IgniteSpiCloseableIterator<List<?>> iter = queryFields(spaceName,
-            "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, null).iterator();
+        IgniteSpiCloseableIterator<List<?>> iter = execute(spaceName,
+                "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, null, 0, null).iterator();
 
         return ((Number)iter.next().get(0)).longValue();
     }
@@ -2618,4 +2715,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             lastUsage = U.currentTimeMillis();
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public void cancelAllQueries() {
+        for (Connection conn : conns)
+            U.close(conn, log);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 580058c..1f05bf7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import java.lang.reflect.Field;
 import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.AbstractCollection;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -52,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@ -452,10 +451,12 @@ public class GridMapQueryExecutor {
 
             for (GridCacheSqlQuery qry : qrys) {
                 ResultSet rs = h2.executeSqlQueryWithTimer(req.space(),
-                    h2.connectionForSpace(req.space()),
-                    qry.query(),
-                    F.asList(qry.parameters()),
-                    true);
+                        h2.connectionForSpace(req.space()),
+                        qry.query(),
+                        F.asList(qry.parameters()),
+                        true,
+                        req.timeout(),
+                        qr.cancels[i]);
 
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                     ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -509,6 +510,15 @@ public class GridMapQueryExecutor {
             // Release reserved partitions.
             for (GridReservable r : reserved)
                 r.release();
+
+            // Ensure all cancels state is correct.
+            if (qr != null)
+                for (int i = 0; i < qr.cancels.length; i++) {
+                    GridQueryCancel cancel = qr.cancels[i];
+
+                    if (cancel != null)
+                        cancel.setCompleted();
+                }
         }
     }
 
@@ -637,6 +647,9 @@ public class GridMapQueryExecutor {
         private final AtomicReferenceArray<QueryResult> results;
 
         /** */
+        private final GridQueryCancel[] cancels;
+
+        /** */
         private final GridCacheContext<?,?> cctx;
 
         /** */
@@ -652,6 +665,10 @@ public class GridMapQueryExecutor {
             this.cctx = cctx;
 
             results = new AtomicReferenceArray<>(qrys);
+            cancels = new GridQueryCancel[qrys];
+
+            for (int i = 0; i < cancels.length; i++)
+                cancels[i] = new GridQueryCancel();
         }
 
         /**
@@ -687,6 +704,9 @@ public class GridMapQueryExecutor {
             return true;
         }
 
+        /**
+         * Cancels the query.
+         */
         void cancel() {
             if (canceled)
                 return;
@@ -696,8 +716,16 @@ public class GridMapQueryExecutor {
             for (int i = 0; i < results.length(); i++) {
                 QueryResult res = results.get(i);
 
-                if (res != null)
+                if (res != null) {
                     res.close();
+
+                    continue;
+                }
+
+                GridQueryCancel cancel = cancels[i];
+
+                if (cancel != null)
+                    cancel.cancel();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 6a079f0..3fdbf42 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,8 +62,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
@@ -244,7 +246,7 @@ public class GridReduceQueryExecutor {
     private void onFail(ClusterNode node, GridQueryFailResponse msg) {
         QueryRun r = runs.get(msg.queryRequestId());
 
-        fail(r, node.id(), msg.error());
+        fail(r, node.id(), msg.error(), msg.failCode());
     }
 
     /**
@@ -252,9 +254,15 @@ public class GridReduceQueryExecutor {
      * @param nodeId Failed node ID.
      * @param msg Error message.
      */
-    private void fail(QueryRun r, UUID nodeId, String msg) {
-        if (r != null)
-            r.state(new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg), nodeId);
+    private void fail(QueryRun r, UUID nodeId, String msg, byte failCode) {
+        if (r != null) {
+            CacheException e = new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg);
+
+            if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR)
+                e.addSuppressed(new QueryCancelledException());
+
+            r.state(e, nodeId);
+        }
     }
 
     /**
@@ -312,7 +320,7 @@ public class GridReduceQueryExecutor {
         catch (Exception e) {
             U.error(log, "Error in message.", e);
 
-            fail(r, node.id(), "Error in message.");
+            fail(r, node.id(), "Error in message.", GridQueryFailResponse.GENERAL_ERROR);
 
             return;
         }
@@ -449,9 +457,12 @@ public class GridReduceQueryExecutor {
      * @param cctx Cache context.
      * @param qry Query.
      * @param keepBinary Keep binary.
-     * @return Cursor.
+     * @param timeoutMillis Timeout in milliseconds.
+     * @param cancel Query cancel.
+     * @return Rows iterator.
      */
-    public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepBinary) {
+    public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepBinary,
+        int timeoutMillis, GridQueryCancel cancel) {
         for (int attempt = 0;; attempt++) {
             if (attempt != 0) {
                 try {
@@ -464,7 +475,7 @@ public class GridReduceQueryExecutor {
                 }
             }
 
-            long qryReqId = reqIdGen.incrementAndGet();
+            final long qryReqId = reqIdGen.incrementAndGet();
 
             QueryRun r = new QueryRun();
 
@@ -510,6 +521,8 @@ public class GridReduceQueryExecutor {
                 nodes = Collections.singleton(F.rand(nodes));
             }
 
+            final Collection<ClusterNode> finalNodes = nodes;
+
             int tblIdx = 0;
 
             final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
@@ -545,6 +558,8 @@ public class GridReduceQueryExecutor {
             runs.put(qryReqId, r);
 
             try {
+                cancel.checkCancelled();
+
                 if (ctx.clientDisconnected()) {
                     throw new CacheException("Query was cancelled, client node disconnected.",
                         new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(),
@@ -567,12 +582,20 @@ public class GridReduceQueryExecutor {
                         mapQry.marshallParams(m);
                 }
 
+                cancel.set(new Runnable() {
+                    @Override public void run() {
+                        send(finalNodes, new GridQueryCancelRequest(qryReqId), null);
+                    }
+                });
+
                 boolean retry = false;
 
                 if (send(nodes,
-                    new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
+                    new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null, timeoutMillis), partsMap)) {
                     awaitAllReplies(r, nodes);
 
+                    cancel.checkCancelled();
+
                     Object state = r.state.get();
 
                     if (state != null) {
@@ -582,6 +605,9 @@ public class GridReduceQueryExecutor {
                             if (err.getCause() instanceof IgniteClientDisconnectedException)
                                 throw err;
 
+                            if (wasCancelled(err))
+                                throw new QueryCancelledException(); // Throw correct exception.
+
                             throw new CacheException("Failed to run map query remotely.", err);
                         }
 
@@ -627,6 +653,8 @@ public class GridReduceQueryExecutor {
                         resIter = res.iterator();
                     }
                     else {
+                        cancel.checkCancelled();
+
                         GridCacheSqlQuery rdc = qry.reduceQuery();
 
                         // Statement caching is prohibited here because we can't guarantee correct merge index reuse.
@@ -634,17 +662,14 @@ public class GridReduceQueryExecutor {
                             r.conn,
                             rdc.query(),
                             F.asList(rdc.parameters()),
-                            false);
+                            false,
+                            timeoutMillis,
+                            cancel);
 
                         resIter = new Iter(res);
                     }
                 }
 
-                for (GridMergeIndex idx : r.idxs) {
-                    if (!idx.fetchedAll()) // We have to explicitly cancel queries on remote nodes.
-                        send(nodes, new GridQueryCancelRequest(qryReqId), null);
-                }
-
                 if (retry) {
                     if (Thread.currentThread().isInterrupted())
                         throw new IgniteInterruptedCheckedException("Query was interrupted.");
@@ -657,8 +682,12 @@ public class GridReduceQueryExecutor {
             catch (IgniteCheckedException | RuntimeException e) {
                 U.closeQuiet(r.conn);
 
-                if (e instanceof CacheException)
+                if (e instanceof CacheException) {
+                    if (wasCancelled((CacheException)e))
+                        throw new CacheException("Failed to run reduce query locally.", new QueryCancelledException());
+
                     throw (CacheException)e;
+                }
 
                 Throwable cause = e;
 
@@ -673,6 +702,9 @@ public class GridReduceQueryExecutor {
                 throw new CacheException("Failed to run reduce query locally.", cause);
             }
             finally {
+                // Make sure any activity related to current attempt is cancelled.
+                cancelRemoteQueriesIfNeeded(nodes, r, qryReqId);
+
                 if (!runs.remove(qryReqId, r))
                     U.warn(log, "Query run was already removed: " + qryReqId);
 
@@ -685,6 +717,33 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * Returns true if the exception is triggered by query cancel.
+     *
+     * @param e Exception.
+     * @return {@code true} if exception is caused by cancel.
+     */
+    private boolean wasCancelled(CacheException e) {
+        return e.getSuppressed() != null && e.getSuppressed().length > 0 &&
+            e.getSuppressed()[0] instanceof QueryCancelledException;
+    }
+
+    /**
+     * Explicitly cancels remote queries.
+     * @param nodes Nodes.
+     * @param r Query run.
+     * @param qryReqId Query id.
+     */
+    private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes, QueryRun r, long qryReqId) {
+        for (GridMergeIndex idx : r.idxs) {
+            if (!idx.fetchedAll()) {
+                send(nodes, new GridQueryCancelRequest(qryReqId), null);
+
+                break;
+            }
+        }
+    }
+
+    /**
      * @param r Query run.
      * @param nodes Nodes to check periodically if they alive.
      * @throws IgniteInterruptedCheckedException If interrupted.
@@ -986,7 +1045,7 @@ public class GridReduceQueryExecutor {
         List<List<?>> lists = new ArrayList<>();
 
         for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) {
-            ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + table(i), null, false);
+            ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + table(i), null, false, 0, null);
 
             lists.add(F.asList(getPlan(rs)));
         }
@@ -1005,7 +1064,9 @@ public class GridReduceQueryExecutor {
             c,
             "EXPLAIN " + rdc.query(),
             F.asList(rdc.parameters()),
-            false);
+            false,
+            0,
+            null);
 
         lists.add(F.asList(getPlan(rs)));
 
@@ -1240,4 +1301,4 @@ public class GridReduceQueryExecutor {
             return res;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/80abd1b7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
new file mode 100644
index 0000000..3fec966
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheDistributedQueryCancelSelfTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Arrays;
+import java.util.List;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests distributed SQL query cancel related scenarios.
+ */
+public class IgniteCacheDistributedQueryCancelSelfTest extends GridCommonAbstractTest {
+    /** Grids count. */
+    private static final int GRIDS_COUNT = 3;
+
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Cache size. */
+    public static final int CACHE_SIZE = 10_000;
+
+    /** Value size. */
+    public static final int VAL_SIZE = 16;
+
+    /** */
+    private static final String QUERY = "select a._val, b._val from String a, String b";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(GRIDS_COUNT);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+        TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+        spi.setIpFinder(IP_FINDER);
+
+        CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>();
+        ccfg.setIndexedTypes(Integer.class, String.class);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        if ("client".equals(gridName))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** */
+    public void testQueryCancelsOnGridShutdown() throws Exception {
+        try (Ignite client = startGrid("client")) {
+
+            IgniteCache<Object, Object> cache = client.cache(null);
+
+            assertEquals(0, cache.localSize());
+
+            int p = 1;
+            for (int i = 1; i <= CACHE_SIZE; i++) {
+                char[] tmp = new char[VAL_SIZE];
+                Arrays.fill(tmp, ' ');
+                cache.put(i, new String(tmp));
+
+                if (i / (float)CACHE_SIZE >= p / 10f) {
+                    log().info("Loaded " + i + " of " + CACHE_SIZE);
+
+                    p++;
+                }
+            }
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(QUERY);
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+                @Override public void run() {
+                    try {
+                        Thread.sleep(1_500);
+                    }
+                    catch (InterruptedException e) {
+                        throw new IgniteException(e);
+                    }
+
+                    for (Ignite g : G.allGrids())
+                        if (!g.configuration().getDiscoverySpi().isClientMode())
+                            stopGrid(g.name(), true);
+                }
+            }, 1);
+
+            try {
+                final QueryCursor<List<?>> cursor = cache.query(qry);
+
+                cursor.iterator();
+            }
+            catch (CacheException ignored) {
+                // No-op.
+            }
+
+            fut.get();
+
+            // Test must exit gracefully.
+        }
+    }
+
+    /** */
+    public void testQueryResponseFailCode() throws Exception {
+        try (Ignite client = startGrid("client")) {
+
+            CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
+            cfg.setSqlFunctionClasses(Functions.class);
+            cfg.setIndexedTypes(Integer.class, Integer.class);
+            cfg.setName("test");
+
+            IgniteCache<Integer, Integer> cache = client.getOrCreateCache(cfg);
+
+            cache.put(1, 1);
+
+            QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery("select fail() from Integer"));
+
+            try {
+                qry.getAll();
+
+                fail();
+            }
+            catch (Exception e) {
+                assertTrue(e.getCause() instanceof CacheException);
+            }
+        }
+    }
+
+    /** */
+    public static class Functions {
+        /** */
+        @QuerySqlFunction
+        public static int fail() {
+            throw new IllegalArgumentException();
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message