ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [42/50] [abbrv] ignite git commit: Merge branch 'ignite-1.6.10' into ignite-1.7.3
Date Wed, 09 Nov 2016 08:38:56 GMT
Merge branch 'ignite-1.6.10' into ignite-1.7.3


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5fac786b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5fac786b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5fac786b

Branch: refs/heads/master
Commit: 5fac786b6dbb179127ac725180acb54d0f6f4b0a
Parents: d0f4b23 a863eee
Author: devozerov <vozerov@gridgain.com>
Authored: Mon Oct 31 21:31:05 2016 +0300
Committer: thatcoach <ppozerov@list.ru>
Committed: Mon Oct 31 21:58:20 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteScheduler.java |  13 +
 .../cache/query/QueryCancelledException.java    |  35 +++
 .../apache/ignite/cache/query/QueryCursor.java  |   8 +-
 .../ignite/cache/query/SqlFieldsQuery.java      |  26 ++
 .../org/apache/ignite/cache/query/SqlQuery.java |  25 ++
 .../ignite/internal/IgniteSchedulerImpl.java    |  18 ++
 .../processors/cache/QueryCursorImpl.java       |  92 +++++--
 .../closure/GridClosureProcessor.java           |   1 +
 .../processors/query/GridQueryCancel.java       |  84 +++++++
 .../processors/query/GridQueryFieldsResult.java |   3 +-
 .../query/GridQueryFieldsResultAdapter.java     |   3 +-
 .../processors/query/GridQueryIndexing.java     |  11 +-
 .../processors/query/GridQueryProcessor.java    | 105 ++++++--
 .../twostep/messages/GridQueryFailResponse.java |  34 ++-
 .../h2/twostep/messages/GridQueryRequest.java   |  31 ++-
 .../junits/GridTestKernalContext.java           |   1 -
 .../processors/query/h2/IgniteH2Indexing.java   | 160 +++++++++---
 .../query/h2/twostep/GridMapQueryExecutor.java  |  66 +++--
 .../h2/twostep/GridReduceQueryExecutor.java     | 117 ++++++---
 ...niteCacheDistributedQueryCancelSelfTest.java | 176 +++++++++++++
 ...butedQueryStopOnCancelOrTimeoutSelfTest.java | 248 +++++++++++++++++++
 ...cheQueryAbstractDistributedJoinSelfTest.java |   7 +
 .../IgniteCacheQueryNodeRestartSelfTest2.java   | 125 ++++++----
 ...nCancelOrTimeoutDistributedJoinSelfTest.java |   7 +
 ...eCacheLocalQueryCancelOrTimeoutSelfTest.java | 158 ++++++++++++
 .../h2/GridIndexingSpiAbstractSelfTest.java     |   4 +-
 .../IgniteCacheQuerySelfTestSuite2.java         |   8 +
 27 files changed, 1388 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 48dab6b,d1a5117..d3f85af
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@@ -56,12 -58,9 +58,15 @@@ public final class SqlFieldsQuery exten
      /** Collocation flag. */
      private boolean collocated;
  
+     /** Query timeout in millis. */
+     private int timeout;
+ 
 +    /** */
 +    private boolean enforceJoinOrder;
 +
 +    /** */
 +    private boolean distributedJoins;
 +
      /**
       * Constructs SQL fields query.
       *

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index f809b8d,51c6cb5..83e171d
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@@ -43,9 -44,9 +44,12 @@@ public final class SqlQuery<K, V> exten
      @GridToStringInclude
      private Object[] args;
  
+     /** Timeout in millis. */
+     private int timeout;
+ 
 +    /** */
 +    private boolean distributedJoins;
 +
      /**
       * Constructs query for the given type name and SQL query.
       *

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 643cb8c,b1b3c68..6bffa5d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@@ -80,13 -80,15 +80,15 @@@ public interface GridQueryIndexing 
       * @param spaceName Space name.
       * @param qry Query.
       * @param params Query parameters.
 -     * @param filters Space name and key filters.
 +     * @param filter Space name and key filter.
 +     * @param enforceJoinOrder Enforce join order of tables in the query.
+      * @param timeout Query timeout in milliseconds.
+      * @param cancel Query cancel.
       * @return Query result.
       * @throws IgniteCheckedException If failed.
       */
 -    public GridQueryFieldsResult execute(@Nullable String spaceName, String qry,
 -        Collection<Object> params, IndexingQueryFilter filters, int timeout, GridQueryCancel cancel)
 -        throws IgniteCheckedException;
 +    public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry,
-         Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder) throws IgniteCheckedException;
++        Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout, GridQueryCancel cancel) throws IgniteCheckedException;
  
      /**
       * Executes regular query.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 12cf962,3d185c6..27c0b71
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@@ -17,6 -17,6 +17,7 @@@
  
  package org.apache.ignite.internal.processors.query;
  
++import java.util.concurrent.TimeUnit;
  import java.lang.reflect.AccessibleObject;
  import java.lang.reflect.Field;
  import java.lang.reflect.Member;
@@@ -719,6 -740,43 +734,43 @@@ public class GridQueryProcessor extend
      }
  
      /**
+      * @param space Space.
+      * @param clause Clause.
+      * @param params Parameters collection.
+      * @param resType Result type.
+      * @param filters Filters.
+      * @return Key/value rows.
+      * @throws IgniteCheckedException If failed.
+      */
+     @SuppressWarnings("unchecked")
+     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(final String space, final String clause,
+         final Collection<Object> params, final String resType, final IndexingQueryFilter filters)
+         throws IgniteCheckedException {
+         checkEnabled();
+ 
+         if (!busyLock.enterBusy())
+             throw new IllegalStateException("Failed to execute query (grid is stopping).");
+ 
+         try {
+             final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context();
+ 
+             return executeQuery(cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() {
+                 @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException {
+                     TypeDescriptor type = typesByName.get(new TypeName(space, resType));
+ 
+                     if (type == null || !type.registered())
+                         throw new CacheException("Failed to find SQL table for type: " + resType);
+ 
 -                    return idx.query(space, clause, params, type, filters);
++                    return idx.queryLocalSql(space, clause, params, type, filters);
+                 }
+             }, false);
+         }
+         finally {
+             busyLock.leaveBusy();
+         }
+     }
+ 
+     /**
       * @param cctx Cache context.
       * @param qry Query.
       * @return Cursor.
@@@ -868,6 -925,24 +920,24 @@@
      }
  
      /**
+      * @param timeout Timeout.
+      * @param timeUnit Time unit.
+      * @return Converted time.
+      */
+     public static int validateTimeout(int timeout, TimeUnit timeUnit) {
+         A.ensure(timeUnit != TimeUnit.MICROSECONDS && timeUnit != TimeUnit.NANOSECONDS,
 -                "timeUnit minimal resolution is millisecond.");
++            "timeUnit minimal resolution is millisecond.");
+ 
+         A.ensure(timeout >= 0, "timeout value should be non-negative.");
+ 
+         long tmp = TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
+ 
+         A.ensure(timeout <= Integer.MAX_VALUE, "timeout value too large.");
+ 
+         return (int) tmp;
+     }
+ 
+     /**
       * Closeable iterator.
       */
      private interface ClIter<X> extends AutoCloseable, Iterator<X> {
@@@ -888,14 -963,13 +958,13 @@@
  
              return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
                  @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
-                     String space = cctx.name();
-                     String sql = qry.getSql();
-                     Object[] args = qry.getArgs();
+                     final String space = cctx.name();
+                     final String sql = qry.getSql();
+                     final Object[] args = qry.getArgs();
+                     final GridQueryCancel cancel = new GridQueryCancel();
  
 -                    final GridQueryFieldsResult res = idx.execute(space, sql, F.asList(args),
 -                        idx.backupFilter(null, requestTopVer.get(), null), qry.getTimeout(), cancel);
 +                    final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args),
-                         idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder());
- 
-                     sendQueryExecutedEvent(sql, args);
++                        idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
  
                      QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
                          @Override public Iterator<List<?>> iterator() {
@@@ -2412,4 -2501,4 +2487,4 @@@
      private enum IndexType {
          ASC, DESC, TEXT
      }
--}
++}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index f7de86c,550cf9b..6e42f1c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@@ -167,9 -169,16 +173,16 @@@ public class GridQueryRequest implement
      }
  
      /**
+      * @return Timeout.
+      */
+     public int timeout() {
+         return this.timeout;
+     }
+ 
+     /**
       * @return Queries.
       */
 -    public Collection<GridCacheSqlQuery> queries() throws IgniteCheckedException {
 +    public Collection<GridCacheSqlQuery> queries() {
          return qrys;
      }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 331b6f9,ab332c1..ed42bc6
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@@ -75,8 -76,8 +77,9 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.GridCacheContext;
  import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
  import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
  import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+ import org.apache.ignite.internal.processors.query.GridQueryCancel;
  import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
  import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
  import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
@@@ -132,9 -128,9 +135,10 @@@ import org.h2.index.Index
  import org.h2.index.SpatialIndex;
  import org.h2.jdbc.JdbcConnection;
  import org.h2.jdbc.JdbcPreparedStatement;
+ import org.h2.jdbc.JdbcStatement;
  import org.h2.message.DbException;
  import org.h2.mvstore.cache.CacheLongKeyLIRS;
 +import org.h2.result.SortOrder;
  import org.h2.server.web.WebServer;
  import org.h2.table.Column;
  import org.h2.table.IndexColumn;
@@@ -390,7 -356,7 +394,7 @@@ public class IgniteH2Indexing implement
  
              PreparedStatement stmt = cache.get(sql);
  
-             if (stmt != null && !stmt.isClosed()) {
 -            if (stmt != null && !((JdbcStatement)stmt).wasCancelled()) {
++            if (stmt != null && !stmt.isClosed() && !((JdbcStatement)stmt).wasCancelled()) {
                  assert stmt.getConnection() == c;
  
                  return stmt;
@@@ -764,31 -733,36 +768,36 @@@
  
      /** {@inheritDoc} */
      @SuppressWarnings("unchecked")
 -    @Override public GridQueryFieldsResult execute(@Nullable final String spaceName, final String qry,
 -        @Nullable final Collection<Object> params, final IndexingQueryFilter filters,
 +    @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
-         @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder)
++        @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder,
+         final int timeout, final GridQueryCancel cancel)
          throws IgniteCheckedException {
-         Connection conn = connectionForSpace(spaceName);
 -        setFilters(filters);
++        final Connection conn = connectionForSpace(spaceName);
  
 -        try {
 -            final Connection conn = connectionForThread(schema(spaceName));
 +        initLocalQueryContext(conn, enforceJoinOrder, filters);
  
 +        try {
-             ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, qry, params, true);
+             final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true);
  
-             List<GridQueryFieldMetadata> meta = null;
+             List<GridQueryFieldMetadata> meta;
  
-             if (rs != null) {
-                 try {
-                     meta = meta(rs.getMetaData());
-                 }
-                 catch (SQLException e) {
-                     throw new IgniteCheckedException("Failed to get meta data.", e);
-                 }
+             try {
+                 meta = meta(stmt.getMetaData());
+             }
+             catch (SQLException e) {
+                 throw new IgniteCheckedException("Cannot prepare query metadata", e);
              }
  
-             return new GridQueryFieldsResultAdapter(meta, new FieldsIterator(rs));
+             return new GridQueryFieldsResultAdapter(meta, null) {
+                 @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException{
+                     ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel);
+ 
+                     return new FieldsIterator(rs);
+                 }
+             };
          }
          finally {
 -            setFilters(null);
 +            GridH2QueryContext.clearThreadLocal();
          }
      }
  
@@@ -877,12 -851,54 +886,52 @@@
  
          bindParameters(stmt, params);
  
+         return stmt;
+     }
+ 
+     /**
+      * Executes sql query statement.
+      *
+      * @param conn Connection,.
+      * @param stmt Statement.
+      * @param cancel Query cancel.
+      * @return Result.
+      * @throws IgniteCheckedException If failed.
+      */
+     private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
+         int timeoutMillis, @Nullable GridQueryCancel cancel)
+         throws IgniteCheckedException {
+ 
+         if (timeoutMillis > 0)
+             ((Session)((JdbcConnection)conn).getSession()).setQueryTimeout(timeoutMillis);
+ 
+         if (cancel != null) {
+             cancel.set(new Runnable() {
+                 @Override public void run() {
+                     try {
+                         stmt.cancel();
 -                    } catch (SQLException ignored) {
++                    }
++                    catch (SQLException ignored) {
+                         // No-op.
+                     }
+                 }
+             });
+         }
+ 
          try {
              return stmt.executeQuery();
          }
          catch (SQLException e) {
+             // Throw special exception.
+             if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
+                 throw new QueryCancelledException();
+ 
              throw new IgniteCheckedException("Failed to execute SQL query.", e);
          }
+         finally {
 -            if(cancel != null)
 -                cancel.setCompleted();
 -
+             if (timeoutMillis > 0)
+                 ((Session)((JdbcConnection)conn).getSession()).setQueryTimeout(0);
+         }
      }
  
      /**
@@@ -982,14 -1034,10 +1057,14 @@@
          if (tbl == null)
              throw new CacheException("Failed to find SQL table for type: " + type.name());
  
 -        setFilters(filters);
 +        String sql = generateQuery(qry, tbl);
 +
 +        Connection conn = connectionForThread(tbl.schemaName());
 +
 +        initLocalQueryContext(conn, false, filter);
  
          try {
-             ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true);
 -            ResultSet rs = executeQuery(spaceName, qry, params, tbl, null);
++            ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null);
  
              return new KeyValIterator(rs);
          }
@@@ -998,18 -1046,20 +1073,20 @@@
          }
      }
  
 -    /** {@inheritDoc} */
 -    private Iterable<List<?>> doQueryTwoStep(final GridCacheContext<?, ?> cctx, final GridCacheTwoStepQuery qry,
 -        final boolean keepCacheObj,
 +    /**
 +     * @param cctx Cache context.
 +     * @param qry Query.
 +     * @param keepCacheObj Flag to keep cache object.
 +     * @param enforceJoinOrder Enforce join order of tables.
 +     * @return Iterable result.
 +     */
 +    private Iterable<List<?>> runQueryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry,
-         final boolean keepCacheObj, final boolean enforceJoinOrder) {
++        final boolean keepCacheObj, final boolean enforceJoinOrder,
+         final int timeoutMillis,
+         final GridQueryCancel cancel) {
          return new Iterable<List<?>>() {
              @Override public Iterator<List<?>> iterator() {
-                 return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder);
 -                try {
 -                    return rdcQryExec.query(cctx, qry, keepCacheObj, timeoutMillis, cancel);
 -                }
 -                finally {
 -                    if (cancel != null)
 -                        cancel.setCompleted();
 -                }
++                return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel);
              }
          };
      }
@@@ -1038,8 -1088,10 +1115,11 @@@
  
          fqry.setArgs(qry.getArgs());
          fqry.setPageSize(qry.getPageSize());
 +        fqry.setDistributedJoins(qry.isDistributedJoins());
  
+         if(qry.getTimeout() > 0)
+             fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
+ 
          final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry);
  
          final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() {
@@@ -1203,8 -1198,10 +1283,10 @@@
  
          twoStepQry.pageSize(qry.getPageSize());
  
+         GridQueryCancel cancel = new GridQueryCancel();
+ 
          QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
-             runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder));
 -            doQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), qry.getTimeout(), cancel), cancel);
++            runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel), cancel);
  
          cursor.fieldsMeta(meta);
  
@@@ -1263,7 -1271,7 +1345,7 @@@
          if (!upper.startsWith("FROM"))
              from = " FROM " + t +
                  (upper.startsWith("WHERE") || upper.startsWith("ORDER") || upper.startsWith("LIMIT") ?
--                " " : " WHERE ");
++                    " " : " WHERE ");
  
          qry = "SELECT " + t + "." + KEY_FIELD_NAME + ", " + t + "." + VAL_FIELD_NAME + from + qry;
  
@@@ -1561,29 -1552,10 +1643,29 @@@
          if (tbl == null)
              return -1;
  
 -        IgniteSpiCloseableIterator<List<?>> iter = execute(spaceName,
 -                "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, null, 0, null).iterator();
 +        Connection conn = connectionForSpace(spaceName);
 +
 +        setupConnection(conn, false, false);
 +
-         ResultSet rs = executeSqlQuery(conn,
-             "SELECT COUNT(*) FROM " + tbl.fullTableName(), null, false);
- 
 +        try {
++            ResultSet rs = executeSqlQuery(conn, prepareStatement(conn, "SELECT COUNT(*) FROM " + tbl.fullTableName(), false),
++                0, null);
++
 +            if (!rs.next())
 +                throw new IllegalStateException();
  
 -        return ((Number)iter.next().get(0)).longValue();
 +            return rs.getLong(1);
 +        }
 +        catch (SQLException e) {
 +            throw new IgniteCheckedException(e);
 +        }
 +    }
 +
 +    /**
 +     * @return Busy lock.
 +     */
 +    public GridSpinBusyLock busyLock() {
 +        return busyLock;
      }
  
      /**
@@@ -1875,11 -1765,11 +1957,11 @@@
                  U.error(log, "Failed to drop schema on cache stop (will ignore): " + U.maskName(ccfg.getName()), e);
              }
  
 -            for (Iterator<Map.Entry<T3<String, String, Boolean>, TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator();
 -                 it.hasNext();) {
 -                Map.Entry<T3<String, String, Boolean>, TwoStepCachedQuery> e = it.next();
 +            for (Iterator<Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery>> it = twoStepCache.entrySet().iterator();
-                  it.hasNext();) {
++                it.hasNext();) {
 +                Map.Entry<TwoStepCachedQueryKey, TwoStepCachedQuery> e = it.next();
  
 -                if (F.eq(e.getKey().get1(), ccfg.getName()))
 +                if (F.eq(e.getKey().space, ccfg.getName()))
                      it.remove();
              }
          }
@@@ -3000,4 -2715,10 +3082,10 @@@
              lastUsage = U.currentTimeMillis();
          }
      }
- }
+ 
+     /** {@inheritDoc} */
+     @Override public void cancelAllQueries() {
+         for (Connection conn : conns)
+             U.close(conn, log);
+     }
 -}
++}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index bb5e419,1f05bf7..0314b3d
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@@ -35,6 -32,6 +35,7 @@@ import javax.cache.CacheException
  import org.apache.ignite.IgniteCheckedException;
  import org.apache.ignite.IgniteException;
  import org.apache.ignite.IgniteLogger;
++import org.apache.ignite.cache.query.QueryCancelledException;
  import org.apache.ignite.cluster.ClusterNode;
  import org.apache.ignite.events.CacheQueryExecutedEvent;
  import org.apache.ignite.events.CacheQueryReadEvent;
@@@ -51,12 -49,9 +52,13 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
  import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
  import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+ import org.apache.ignite.internal.processors.query.GridQueryCancel;
  import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
 +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@@ -158,8 -144,8 +160,8 @@@ public class GridMapQueryExecutor 
                  if (nodeRess == null)
                      return;
  
 -                for (QueryResults ress : nodeRess.values())
 -                    ress.cancel();
 +                for (QueryResults ress : nodeRess.results().values())
-                     ress.cancel();
++                    ress.cancel(true);
              }
          }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
  
@@@ -237,7 -208,7 +239,7 @@@
          if (results == null)
              return;
  
--        results.cancel();
++        results.cancel(true);
      }
  
      /**
@@@ -409,226 -393,132 +411,231 @@@
       * @param req Query request.
       */
      private void onQueryRequest(ClusterNode node, GridQueryRequest req) {
 -        ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
 +        List<Integer> cacheIds;
  
 -        QueryResults qr = null;
 +        if (req.extraSpaces() != null) {
 +            cacheIds = new ArrayList<>(req.extraSpaces().size() + 1);
  
 -        List<GridReservable> reserved = new ArrayList<>();
 +            cacheIds.add(CU.cacheId(req.space()));
  
 -        try {
 -            // Unmarshall query params.
 -            Collection<GridCacheSqlQuery> qrys;
 +            for (String extraSpace : req.extraSpaces())
 +                cacheIds.add(CU.cacheId(extraSpace));
 +        }
 +        else
 +            cacheIds = Collections.singletonList(CU.cacheId(req.space()));
 +
 +        onQueryRequest0(node,
 +            req.requestId(),
 +            req.queries(),
 +            cacheIds,
 +            req.topologyVersion(),
 +            null,
 +            req.partitions(),
 +            null,
 +            req.pageSize(),
-             false);
++            false,
++            req.timeout());
 +    }
  
 -            try {
 -                qrys = req.queries();
 +    /**
 +     * @param node Node.
 +     * @param req Query request.
 +     */
 +    private void onQueryRequest(ClusterNode node, GridH2QueryRequest req) {
 +        Map<UUID,int[]> partsMap = req.partitions();
 +        int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
 +
 +        onQueryRequest0(node,
 +            req.requestId(),
 +            req.queries(),
 +            req.caches(),
 +            req.topologyVersion(),
 +            partsMap,
 +            parts,
 +            req.tables(),
 +            req.pageSize(),
-             req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
++            req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS),
++            req.timeout());
 +    }
  
 -                if (!node.isLocal()) {
 -                    Marshaller m = ctx.config().getMarshaller();
 +    /**
 +     * @param node Node authored request.
 +     * @param reqId Request ID.
 +     * @param qrys Queries to execute.
 +     * @param cacheIds Caches which will be affected by these queries.
 +     * @param topVer Topology version.
 +     * @param partsMap Partitions map for unstable topology.
 +     * @param parts Explicit partitions for current node.
 +     * @param tbls Tables.
 +     * @param pageSize Page size.
 +     * @param distributedJoins Can we expect distributed joins to be ran.
 +     */
 +    private void onQueryRequest0(
 +        ClusterNode node,
 +        long reqId,
 +        Collection<GridCacheSqlQuery> qrys,
 +        List<Integer> cacheIds,
 +        AffinityTopologyVersion topVer,
 +        Map<UUID, int[]> partsMap,
 +        int[] parts,
 +        Collection<String> tbls,
 +        int pageSize,
-         boolean distributedJoins
++        boolean distributedJoins,
++        int timeout
 +    ) {
 +        // Prepare to run queries.
 +        GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0));
  
 -                    for (GridCacheSqlQuery qry : qrys)
 -                        qry.unmarshallParams(m, ctx);
 -                }
 -            }
 -            catch (IgniteCheckedException e) {
 -                throw new CacheException("Failed to unmarshall parameters.", e);
 -            }
 +        if (mainCctx == null)
 +            throw new CacheException("Failed to find cache.");
 +
 +        NodeResults nodeRess = resultsForNode(node.id());
  
 -            List<String> caches = (List<String>)F.concat(true, req.space(), req.extraSpaces());
 +        QueryResults qr = null;
  
 -            // Topology version can be null in rolling restart with previous version!
 -            final AffinityTopologyVersion topVer = req.topologyVersion();
 +        List<GridReservable> reserved = new ArrayList<>();
  
 +        try {
              if (topVer != null) {
                  // Reserve primary for topology version or explicit partitions.
 -                if (!reservePartitions(caches, topVer, req.partitions(), reserved)) {
 -                    sendRetry(node, req.requestId());
 +                if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
 +                    sendRetry(node, reqId);
  
                      return;
                  }
              }
  
 -            // Prepare to run queries.
 -            GridCacheContext<?,?> mainCctx = cacheContext(req.space());
 +            qr = new QueryResults(reqId, qrys.size(), mainCctx);
  
 -            if (mainCctx == null)
 -                throw new CacheException("Failed to find cache: " + req.space());
 +            if (nodeRess.results().put(reqId, qr) != null)
 +                throw new IllegalStateException();
  
 -            qr = new QueryResults(req.requestId(), qrys.size(), mainCctx);
 +            // Prepare query context.
 +            GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
 +                node.id(),
 +                reqId,
 +                mainCctx.isReplicated() ? REPLICATED : MAP)
 +                .filter(h2.backupFilter(topVer, parts))
 +                .partitionsMap(partsMap)
 +                .distributedJoins(distributedJoins)
 +                .pageSize(pageSize)
 +                .topologyVersion(topVer)
 +                .reservations(reserved);
  
 -            if (nodeRess.put(req.requestId(), qr) != null)
 -                throw new IllegalStateException();
 +            List<GridH2Table> snapshotedTbls = null;
  
 -            h2.setFilters(h2.backupFilter(caches, topVer, req.partitions()));
 +            if (!F.isEmpty(tbls)) {
 +                snapshotedTbls = new ArrayList<>(tbls.size());
  
 -            // TODO Prepare snapshots for all the needed tables before the run.
 +                for (String identifier : tbls) {
 +                    GridH2Table tbl = h2.dataTable(identifier);
  
 -            // Run queries.
 -            int i = 0;
 +                    Objects.requireNonNull(tbl, identifier);
  
 -            for (GridCacheSqlQuery qry : qrys) {
 -                ResultSet rs = h2.executeSqlQueryWithTimer(req.space(),
 -                        h2.connectionForSpace(req.space()),
 -                        qry.query(),
 -                        F.asList(qry.parameters()),
 -                        true,
 -                        req.timeout(),
 -                        qr.cancels[i]);
 +                    tbl.snapshotIndexes(qctx);
  
 -                if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
 -                    ctx.event().record(new CacheQueryExecutedEvent<>(
 -                        node,
 -                        "SQL query executed.",
 -                        EVT_CACHE_QUERY_EXECUTED,
 -                        CacheQueryType.SQL.name(),
 -                        mainCctx.namex(),
 -                        null,
 -                        qry.query(),
 -                        null,
 -                        null,
 -                        qry.parameters(),
 -                        node.id(),
 -                        null));
 +                    snapshotedTbls.add(tbl);
                  }
 +            }
  
 -                assert rs instanceof JdbcResultSet : rs.getClass();
 +            Connection conn = h2.connectionForSpace(mainCctx.name());
  
 -                qr.addResult(i, qry, node.id(), rs);
 +            // Here we enforce join order to have the same behavior on all the nodes.
 +            h2.setupConnection(conn, distributedJoins, true);
  
 -                if (qr.canceled) {
 -                    qr.result(i).close();
 +            GridH2QueryContext.set(qctx);
  
 -                    return;
 +            // qctx is set, we have to release reservations inside of it.
 +            reserved = null;
 +
 +            try {
 +                if (nodeRess.cancelled(reqId)) {
 +                    GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
 +
 +                    nodeRess.results().remove(reqId);
 +
-                     return;
++                    throw new QueryCancelledException();
                  }
  
 -                // Send the first page.
 -                sendNextPage(nodeRess, node, qr, i, req.pageSize());
 +                // Run queries.
 +                int i = 0;
 +
 +                boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
 +
 +                for (GridCacheSqlQuery qry : qrys) {
 +                    ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
-                         F.asList(qry.parameters()), true);
++                        F.asList(qry.parameters()), true,
++                        timeout,
++                        qr.cancels[i]);
 +
 +                    if (evt) {
 +                        ctx.event().record(new CacheQueryExecutedEvent<>(
 +                            node,
 +                            "SQL query executed.",
 +                            EVT_CACHE_QUERY_EXECUTED,
 +                            CacheQueryType.SQL.name(),
 +                            mainCctx.namex(),
 +                            null,
 +                            qry.query(),
 +                            null,
 +                            null,
 +                            qry.parameters(),
 +                            node.id(),
 +                            null));
 +                    }
 +
 +                    assert rs instanceof JdbcResultSet : rs.getClass();
 +
 +                    qr.addResult(i, qry, node.id(), rs);
 +
 +                    if (qr.canceled) {
 +                        qr.result(i).close();
  
-                         return;
 -                i++;
++                        throw new QueryCancelledException();
 +                    }
 +
 +                    // Send the first page.
 +                    sendNextPage(nodeRess, node, qr, i, pageSize);
 +
 +                    i++;
 +                }
 +            }
 +            finally {
 +                GridH2QueryContext.clearThreadLocal();
 +
 +                if (!distributedJoins)
 +                    qctx.clearContext(false);
 +
 +                if (!F.isEmpty(snapshotedTbls)) {
 +                    for (GridH2Table dataTbl : snapshotedTbls)
 +                        dataTbl.releaseSnapshots();
 +                }
              }
          }
          catch (Throwable e) {
              if (qr != null) {
 -                nodeRess.remove(req.requestId(), qr);
 +                nodeRess.results().remove(reqId, qr);
  
--                qr.cancel();
++                qr.cancel(false);
              }
  
 -            U.error(log, "Failed to execute local query: " + req, e);
 +            if (X.hasCause(e, GridH2RetryException.class))
 +                sendRetry(node, reqId);
 +            else {
 +                U.error(log, "Failed to execute local query.", e);
  
 -            sendError(node, req.requestId(), e);
 +                sendError(node, reqId, e);
  
 -            if (e instanceof Error)
 -                throw (Error)e;
 +                if (e instanceof Error)
 +                    throw (Error)e;
 +            }
          }
          finally {
 -            h2.setFilters(null);
 -
 -            // Release reserved partitions.
 -            for (GridReservable r : reserved)
 -                r.release();
 -
 -            // Ensure all cancels state is correct.
 -            if (qr != null)
 -                for (int i = 0; i < qr.cancels.length; i++) {
 -                    GridQueryCancel cancel = qr.cancels[i];
 -
 -                    if (cancel != null)
 -                        cancel.setCompleted();
 -                }
 +            if (reserved != null) {
 +                // Release reserved partitions.
 +                for (int i = 0; i < reserved.size(); i++)
 +                    reserved.get(i).release();
 +            }
          }
      }
  
@@@ -661,12 -548,12 +668,24 @@@
       * @param req Request.
       */
      private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) {
 -        ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
 +        NodeResults nodeRess = qryRess.get(node.id());
 +
-         QueryResults qr = nodeRess == null ? null : nodeRess.results().get(req.queryRequestId());
++        if (nodeRess == null) {
++            sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req));
  
-         if (qr == null || qr.canceled)
 -        QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId());
++            return;
++        } else if (nodeRess.cancelled(req.queryRequestId())) {
++            sendError(node, req.queryRequestId(), new QueryCancelledException());
++
++            return;
++        }
+ 
 -        if (qr == null || qr.canceled)
++        QueryResults qr = nodeRess.results().get(req.queryRequestId());
++
++        if (qr == null)
              sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
++        else if (qr.canceled)
++            sendError(node, req.queryRequestId(), new QueryCancelledException());
          else
              sendNextPage(nodeRess, node, qr, req.query(), req.pageSize());
      }
@@@ -854,9 -705,9 +880,9 @@@
          }
  
          /**
-          *
+          * Cancels the query.
           */
--        void cancel() {
++        void cancel(boolean forceQryCancel) {
              if (canceled)
                  return;
  
@@@ -865,8 -716,16 +891,18 @@@
              for (int i = 0; i < results.length(); i++) {
                  QueryResult res = results.get(i);
  
-                 if (res != null)
+                 if (res != null) {
                      res.close();
+ 
+                     continue;
+                 }
+ 
 -                GridQueryCancel cancel = cancels[i];
++                if (forceQryCancel) {
++                    GridQueryCancel cancel = cancels[i];
+ 
 -                if (cancel != null)
 -                    cancel.cancel();
++                    if (cancel != null)
++                        cancel.cancel();
++                }
              }
          }
      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 04449ac,3fdbf42..3847373
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@@ -46,6 -45,6 +45,7 @@@ import org.apache.ignite.IgniteCheckedE
  import org.apache.ignite.IgniteClientDisconnectedException;
  import org.apache.ignite.IgniteException;
  import org.apache.ignite.IgniteLogger;
++import org.apache.ignite.cache.query.QueryCancelledException;
  import org.apache.ignite.cluster.ClusterNode;
  import org.apache.ignite.events.DiscoveryEvent;
  import org.apache.ignite.events.Event;
@@@ -64,9 -62,10 +64,10 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
  import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
  import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
+ import org.apache.ignite.internal.processors.query.GridQueryCancel;
  import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator;
  import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 -import org.apache.ignite.cache.query.QueryCancelledException;
 +import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
  import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
  import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
@@@ -74,14 -73,11 +75,15 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
  import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest;
 +import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
  import org.apache.ignite.internal.util.GridSpinBusyLock;
 +import org.apache.ignite.internal.util.typedef.CIX2;
  import org.apache.ignite.internal.util.typedef.F;
++import org.apache.ignite.internal.util.typedef.X;
  import org.apache.ignite.internal.util.typedef.internal.U;
 +import org.apache.ignite.lang.IgniteBiClosure;
  import org.apache.ignite.lang.IgniteFuture;
 -import org.apache.ignite.marshaller.Marshaller;
 +import org.apache.ignite.lang.IgniteProductVersion;
  import org.apache.ignite.plugin.extensions.communication.Message;
  import org.h2.command.ddl.CreateTableData;
  import org.h2.engine.Session;
@@@ -468,16 -456,13 +476,20 @@@ public class GridReduceQueryExecutor 
      /**
       * @param cctx Cache context.
       * @param qry Query.
 -     * @param keepBinary Keep binary.
 +     * @param keepPortable Keep portable.
 +     * @param enforceJoinOrder Enforce join order of tables.
-      * @return Cursor.
+      * @param timeoutMillis Timeout in milliseconds.
+      * @param cancel Query cancel.
+      * @return Rows iterator.
       */
 -    public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepBinary,
 -        int timeoutMillis, GridQueryCancel cancel) {
 +    public Iterator<List<?>> query(
 +        GridCacheContext<?, ?> cctx,
 +        GridCacheTwoStepQuery qry,
 +        boolean keepPortable,
-         boolean enforceJoinOrder
++        boolean enforceJoinOrder,
++        int timeoutMillis,
++        GridQueryCancel cancel
 +    ) {
          for (int attempt = 0;; attempt++) {
              if (attempt != 0) {
                  try {
@@@ -579,39 -575,27 +595,49 @@@
                          mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters()));
                  }
  
-                 boolean retry = false;
- 
 -                if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
 -                    Marshaller m = ctx.config().getMarshaller();
 +                IgniteProductVersion minNodeVer = cctx.shared().exchange().minimumNodeVersion(topVer);
  
 -                    for (GridCacheSqlQuery mapQry : mapQrys)
 -                        mapQry.marshallParams(m);
 -                }
 +                final boolean oldStyle = minNodeVer.compareToIgnoreTimestamp(DISTRIBUTED_JOIN_SINCE) < 0;
 +                final boolean distributedJoins = qry.distributedJoins();
  
+                 cancel.set(new Runnable() {
+                     @Override public void run() {
 -                        send(finalNodes, new GridQueryCancelRequest(qryReqId), null);
++                        send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
+                     }
+                 });
+ 
+                 boolean retry = false;
+ 
 +                if (oldStyle && distributedJoins)
 +                    throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes.");
 +
                  if (send(nodes,
 -                    new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null, timeoutMillis), partsMap)) {
 +                    oldStyle ?
 +                        new GridQueryRequest(qryReqId,
 +                            r.pageSize,
 +                            space,
 +                            mapQrys,
 +                            topVer,
 +                            extraSpaces(space, qry.spaces()),
-                             null) :
++                            null,
++                            timeoutMillis) :
 +                        new GridH2QueryRequest()
 +                            .requestId(qryReqId)
 +                            .topologyVersion(topVer)
 +                            .pageSize(r.pageSize)
 +                            .caches(qry.caches())
 +                            .tables(distributedJoins ? qry.tables() : null)
 +                            .partitions(convert(partsMap))
 +                            .queries(mapQrys)
-                             .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0),
++                            .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0)
++                            .timeout(timeoutMillis),
 +                    oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
 +                    distributedJoins)
-                 ) {
++                    ) {
                      awaitAllReplies(r, nodes);
  
+                     cancel.checkCancelled();
+ 
                      Object state = r.state.get();
  
                      if (state != null) {
@@@ -663,30 -653,20 +692,34 @@@
                          resIter = res.iterator();
                      }
                      else {
+                         cancel.checkCancelled();
+ 
 -                        GridCacheSqlQuery rdc = qry.reduceQuery();
 +                        UUID locNodeId = ctx.localNodeId();
 +
 +                        h2.setupConnection(r.conn, false, enforceJoinOrder);
  
 -                        // Statement caching is prohibited here because we can't guarantee correct merge index reuse.
 -                        ResultSet res = h2.executeSqlQueryWithTimer(space,
 -                            r.conn,
 -                            rdc.query(),
 -                            F.asList(rdc.parameters()),
 -                            false,
 -                            timeoutMillis,
 -                            cancel);
 +                        GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
 +                            .pageSize(r.pageSize).distributedJoins(false));
  
 -                        resIter = new Iter(res);
 +                        try {
 +                            if (qry.explain())
 +                                return explainPlan(r.conn, space, qry);
 +
 +                            GridCacheSqlQuery rdc = qry.reduceQuery();
 +
 +                            ResultSet res = h2.executeSqlQueryWithTimer(space,
 +                                r.conn,
 +                                rdc.query(),
 +                                F.asList(rdc.parameters()),
-                                 false);
++                                false,
++                                timeoutMillis,
++                                cancel);
 +
 +                            resIter = new Iter(res);
 +                        }
 +                        finally {
 +                            GridH2QueryContext.clearThreadLocal();
 +                        }
                      }
                  }
  
@@@ -699,16 -677,7 +730,7 @@@
                      continue;
                  }
  
-                 final Collection<ClusterNode> finalNodes = nodes;
- 
-                 return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable) {
-                     @Override public void close() throws Exception {
-                         super.close();
- 
-                         if (distributedJoins || !allIndexesFetched(r.idxs))
-                             send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
-                     }
-                 };
 -                return new GridQueryCacheObjectsIterator(resIter, cctx, keepBinary);
++                return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable);
              }
              catch (IgniteCheckedException | RuntimeException e) {
                  U.closeQuiet(r.conn);
@@@ -741,19 -717,33 +770,43 @@@
      }
  
      /**
 +     * @param idxs Merge indexes.
 +     * @return {@code true} If all remote data was fetched.
 +     */
 +    private static boolean allIndexesFetched(List<GridMergeIndex> idxs) {
 +        for (int i = 0; i <  idxs.size(); i++) {
 +            if (!idxs.get(i).fetchedAll())
 +                return false;
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
+      * Returns true if the exception is triggered by query cancel.
+      *
+      * @param e Exception.
+      * @return {@code true} if exception is caused by cancel.
+      */
+     private boolean wasCancelled(CacheException e) {
 -        return e.getSuppressed() != null && e.getSuppressed().length > 0 &&
 -            e.getSuppressed()[0] instanceof QueryCancelledException;
++        return X.hasSuppressed(e, QueryCancelledException.class);
+     }
+ 
+     /**
 -     * Explicitly cancels remote queries.
 -     * @param nodes Nodes.
+      * @param r Query run.
+      * @param qryReqId Query id.
+      */
+     private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes, QueryRun r, long qryReqId) {
+         for (GridMergeIndex idx : r.idxs) {
+             if (!idx.fetchedAll()) {
 -                send(nodes, new GridQueryCancelRequest(qryReqId), null);
++                send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
+ 
+                 break;
+             }
+         }
+     }
+ 
+     /**
       * @param r Query run.
       * @param nodes Nodes to check periodically if they alive.
       * @throws IgniteInterruptedCheckedException If interrupted.
@@@ -1290,7 -1259,7 +1345,7 @@@
                  latch.countDown();
  
              for (GridMergeIndex idx : idxs) // Fail all merge indexes.
--                idx.fail(nodeId);
++                idx.fail(nodeId, o instanceof CacheException ? (CacheException) o : null);
          }
  
          /**
@@@ -1332,24 -1301,4 +1387,24 @@@
              return res;
          }
      }
 +
 +    /**
 +     *
 +     */
 +    private class ExplicitPartitionsSpecializer implements IgniteBiClosure<ClusterNode,Message,Message> {
 +        /** */
 +        private final Map<ClusterNode,IntArray> partsMap;
 +
 +        /**
 +         * @param partsMap Partitions map.
 +         */
 +        private ExplicitPartitionsSpecializer(Map<ClusterNode,IntArray> partsMap) {
 +            this.partsMap = partsMap;
 +        }
 +
 +        /** {@inheritDoc} */
 +        @Override public Message apply(ClusterNode n, Message msg) {
 +            return copy(msg, n, partsMap);
 +        }
 +    }
- }
+ }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
index 0000000,0000000..be34a09
new file mode 100644
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java
@@@ -1,0 -1,0 +1,7 @@@
++package org.apache.ignite.internal.processors.cache.distributed.near;
++
++/**
++ * Created by vozerov on 31.10.2016.
++ */
++public class IgniteCacheQueryAbstractDistributedJoinSelfTest {
++}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
index 0000000,0000000..80bd62e
new file mode 100644
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java
@@@ -1,0 -1,0 +1,7 @@@
++package org.apache.ignite.internal.processors.cache.distributed.near;
++
++/**
++ * Created by vozerov on 31.10.2016.
++ */
++public class IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest {
++}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 760ee19,6e493ea..bcf8f9d
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@@ -349,8 -348,8 +349,8 @@@ public abstract class GridIndexingSpiAb
  
          // Fields query
          GridQueryFieldsResult fieldsRes =
 -            spi.execute("A", "select a.a.name n1, a.a.age a1, b.a.name n2, " +
 -                "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, 0, null);
 +            spi.queryLocalSqlFields("A", "select a.a.name n1, a.a.age a1, b.a.name n2, " +
-             "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false);
++            "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, 0, null);
  
          String[] aliases = {"N1", "A1", "N2", "A2"};
          Object[] vals = { "Valera", 19, "Kolya", 25};
@@@ -451,8 -450,7 +451,8 @@@
                  time = now;
                  range *= 3;
  
 -                GridQueryFieldsResult res = spi.execute("A", sql, Arrays.<Object>asList(1, range), null, 0, null);
 +                GridQueryFieldsResult res = spi.queryLocalSqlFields("A", sql, Arrays.<Object>asList(1, range), null,
-                     false);
++                    false, 0, null);
  
                  assert res.iterator().hasNext();
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/5fac786b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
index 568f880,9128f76..5722c01
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
@@@ -39,7 -41,12 +41,8 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQueryP2PEnabledSelfTest;
  import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedFieldsQuerySelfTest;
  import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
+ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQueryCancelOrTimeoutSelfTest;
  import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
 -import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest;
 -import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest;
 -import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest;
 -import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryReplicatedSelfTest;
  import org.apache.ignite.internal.processors.query.h2.sql.BaseH2CompareQueryTest;
  import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
  import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;


Mime
View raw message