ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [13/50] [abbrv] ignite git commit: ignite-1232 Distributed SQL joins implementation
Date Thu, 28 Jul 2016 11:06:38 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
index 38e3839..8d31651 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperationType.java
@@ -119,7 +119,7 @@ public enum GridSqlOperationType {
 
         /** {@inheritDoc} */
         @Override public String getSql(GridSqlOperation operation) {
-            assert operation.opType().childrenCnt == 2;
+            assert operation.operationType().childrenCnt == 2;
 
             return '(' + operation.child(0).getSQL() + " " + delim + " " + operation.child(1).getSQL() + ')';
         }
@@ -132,7 +132,7 @@ public enum GridSqlOperationType {
 
         /** {@inheritDoc} */
         @Override public String getSql(GridSqlOperation operation) {
-            assert operation.opType().childrenCnt == 2;
+            assert operation.operationType().childrenCnt == 2;
 
             return "(INTERSECTS(" + operation.child(0).getSQL() + ", " + operation.child(1).getSQL() + "))";
         }
@@ -154,7 +154,7 @@ public enum GridSqlOperationType {
 
         /** {@inheritDoc} */
         @Override public String getSql(GridSqlOperation operation) {
-            assert operation.opType().childrenCnt == 1;
+            assert operation.operationType().childrenCnt == 1;
 
             return '(' + text + ' ' + operation.child().getSQL() + ')';
         }
@@ -176,7 +176,7 @@ public enum GridSqlOperationType {
 
         /** {@inheritDoc} */
         @Override public String getSql(GridSqlOperation operation) {
-            assert operation.opType().childrenCnt == 1;
+            assert operation.operationType().childrenCnt == 1;
 
             return '(' + operation.child().getSQL() + ' ' + text + ')';
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 7001717..a7451c1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -21,8 +21,10 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.IdentityHashMap;
 import java.util.List;
+import javax.cache.CacheException;
 import org.apache.ignite.IgniteException;
 import org.h2.command.Command;
+import org.h2.command.CommandContainer;
 import org.h2.command.Prepared;
 import org.h2.command.dml.Explain;
 import org.h2.command.dml.Query;
@@ -48,6 +50,8 @@ import org.h2.expression.Parameter;
 import org.h2.expression.Subquery;
 import org.h2.expression.TableFunction;
 import org.h2.expression.ValueExpression;
+import org.h2.index.Index;
+import org.h2.index.ViewIndex;
 import org.h2.jdbc.JdbcPreparedStatement;
 import org.h2.result.SortOrder;
 import org.h2.table.Column;
@@ -91,14 +95,16 @@ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlType.fro
 @SuppressWarnings("TypeMayBeWeakened")
 public class GridSqlQueryParser {
     /** */
-    private static final GridSqlOperationType[] OPERATION_OP_TYPES = new GridSqlOperationType[] {CONCAT, PLUS, MINUS, MULTIPLY, DIVIDE, null, MODULUS};
+    private static final GridSqlOperationType[] OPERATION_OP_TYPES =
+        {CONCAT, PLUS, MINUS, MULTIPLY, DIVIDE, null, MODULUS};
 
     /** */
-    private static final GridSqlOperationType[] COMPARISON_TYPES = new GridSqlOperationType[] {
-        EQUAL, BIGGER_EQUAL, BIGGER, SMALLER_EQUAL,
+    private static final GridSqlOperationType[] COMPARISON_TYPES =
+        {EQUAL, BIGGER_EQUAL, BIGGER, SMALLER_EQUAL,
         SMALLER, NOT_EQUAL, IS_NULL, IS_NOT_NULL,
-        null, null, null, SPATIAL_INTERSECTS /* 11 */, null, null, null, null, EQUAL_NULL_SAFE /* 16 */, null, null, null, null,
-        NOT_EQUAL_NULL_SAFE /* 21 */};
+        null, null, null, SPATIAL_INTERSECTS /* 11 */,
+        null, null, null, null, EQUAL_NULL_SAFE /* 16 */,
+        null, null, null, null, NOT_EQUAL_NULL_SAFE /* 21 */};
 
     /** */
     private static final Getter<Select, Expression> CONDITION = getter(Select.class, "condition");
@@ -134,7 +140,7 @@ public class GridSqlQueryParser {
     private static final Getter<ConditionAndOr, Expression> ANDOR_RIGHT = getter(ConditionAndOr.class, "right");
 
     /** */
-    private static final Getter<TableView, Query> VIEW_QUERY = getter(TableView.class, "viewQuery");
+    public static final Getter<TableView, Query> VIEW_QUERY = getter(TableView.class, "viewQuery");
 
     /** */
     private static final Getter<TableFilter, String> ALIAS = getter(TableFilter.class, "alias");
@@ -218,7 +224,8 @@ public class GridSqlQueryParser {
     private static final Getter<Explain, Prepared> EXPLAIN_COMMAND = getter(Explain.class, "command");
 
     /** */
-    private static volatile Getter<Command, Prepared> prepared;
+    private static final Getter<Command, Prepared> PREPARED =
+        GridSqlQueryParser.<Command, Prepared>getter(CommandContainer.class, "prepared");
 
     /** */
     private final IdentityHashMap<Object, Object> h2ObjToGridObj = new IdentityHashMap<>();
@@ -227,22 +234,12 @@ public class GridSqlQueryParser {
      * @param stmt Prepared statement.
      * @return Parsed select.
      */
-    public static GridSqlQuery parse(JdbcPreparedStatement stmt) {
+    public static Prepared prepared(JdbcPreparedStatement stmt) {
         Command cmd = COMMAND.get(stmt);
 
-        Getter<Command, Prepared> p = prepared;
+        assert cmd instanceof CommandContainer;
 
-        if (p == null) {
-            Class<? extends Command> cls = cmd.getClass();
-
-            assert "CommandContainer".equals(cls.getSimpleName());
-
-            prepared = p = getter(cls, "prepared");
-        }
-
-        Prepared statement = p.get(cmd);
-
-        return new GridSqlQueryParser().parse(statement);
+        return PREPARED.get(cmd);
     }
 
     /**
@@ -255,11 +252,15 @@ public class GridSqlQueryParser {
             Table tbl = filter.getTable();
 
             if (tbl instanceof TableBase)
-                res = new GridSqlTable(tbl.getSchema().getName(), tbl.getName());
+                res = new GridSqlTable(tbl);
             else if (tbl instanceof TableView) {
                 Query qry = VIEW_QUERY.get((TableView)tbl);
 
-                res = new GridSqlSubquery(parse(qry));
+                Index idx = filter.getIndex();
+
+                Query idxQry = idx instanceof ViewIndex ? ((ViewIndex)idx).getQuery() : null;
+
+                res = new GridSqlSubquery(parse(qry, idxQry));
             }
             else if (tbl instanceof FunctionTable)
                 res = parseExpression(FUNC_EXPR.get((FunctionTable)tbl), false);
@@ -286,7 +287,7 @@ public class GridSqlQueryParser {
     /**
      * @param select Select.
      */
-    public GridSqlSelect parse(Select select) {
+    public GridSqlSelect parse(Select select, @Nullable Query idxQry) {
         GridSqlSelect res = (GridSqlSelect)h2ObjToGridObj.get(select);
 
         if (res != null)
@@ -305,6 +306,9 @@ public class GridSqlQueryParser {
 
         TableFilter filter = select.getTopTableFilter();
 
+        if (idxQry instanceof Select)
+            filter = ((Select)idxQry).getTopTableFilter();
+
         do {
             assert0(filter != null, select);
             assert0(filter.getNestedJoin() == null, select);
@@ -366,11 +370,35 @@ public class GridSqlQueryParser {
     }
 
     /**
-     * @param qry Select.
+     * @param qry Prepared.
+     * @return Query.
+     */
+    public static Query query(Prepared qry) {
+        if (qry instanceof Query)
+            return (Query)qry;
+
+        if (qry instanceof Explain)
+            return query(EXPLAIN_COMMAND.get((Explain)qry));
+
+        throw new CacheException("Unsupported query: " + qry);
+    }
+
+    /**
+     * @param qry Prepared.
+     * @return Query.
      */
     public GridSqlQuery parse(Prepared qry) {
+        return parse(qry, null);
+    }
+
+    /**
+     * @param qry Select.
+     */
+    public GridSqlQuery parse(Prepared qry, @Nullable Query idxQry) {
+        assert qry != null;
+
         if (qry instanceof Select)
-            return parse((Select)qry);
+            return parse((Select)qry, idxQry);
 
         if (qry instanceof SelectUnion)
             return parse((SelectUnion)qry);
@@ -378,7 +406,7 @@ public class GridSqlQueryParser {
         if (qry instanceof Explain)
             return parse(EXPLAIN_COMMAND.get((Explain)qry)).explain(true);
 
-        throw new UnsupportedOperationException("Unknown query type: " + qry);
+        throw new CacheException("Unsupported query: " + qry);
     }
 
     /**
@@ -437,11 +465,12 @@ public class GridSqlQueryParser {
      */
     private GridSqlElement parseExpression0(Expression expression, boolean calcTypes) {
         if (expression instanceof ExpressionColumn) {
-            TableFilter tblFilter = ((ExpressionColumn)expression).getTableFilter();
-
-            GridSqlElement gridTblFilter = parseTable(tblFilter);
+            ExpressionColumn expCol = (ExpressionColumn)expression;
 
-            return new GridSqlColumn(gridTblFilter, expression.getColumnName(), expression.getSQL());
+            return new GridSqlColumn(expCol.getColumn(),
+                parseTable(expCol.getTableFilter()),
+                expression.getColumnName(),
+                expression.getSQL());
         }
 
         if (expression instanceof Alias)
@@ -475,12 +504,14 @@ public class GridSqlQueryParser {
 
             assert opType != null : COMPARISON_TYPE.get(cmp);
 
-            GridSqlElement left = parseExpression(COMPARISON_LEFT.get(cmp), calcTypes);
+            Expression leftExp = COMPARISON_LEFT.get(cmp);
+            GridSqlElement left = parseExpression(leftExp, calcTypes);
 
             if (opType.childrenCount() == 1)
                 return new GridSqlOperation(opType, left);
 
-            GridSqlElement right = parseExpression(COMPARISON_RIGHT.get(cmp), calcTypes);
+            Expression rightExp = COMPARISON_RIGHT.get(cmp);
+            GridSqlElement right = parseExpression(rightExp, calcTypes);
 
             return new GridSqlOperation(opType, left, right);
         }
@@ -685,7 +716,7 @@ public class GridSqlQueryParser {
      * Field getter.
      */
     @SuppressWarnings("unchecked")
-    private static class Getter<T, R> {
+    public static class Getter<T, R> {
         /** */
         private final Field fld;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 4d34ce8..7205a18 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -23,18 +23,27 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.h2.command.Prepared;
 import org.h2.jdbc.JdbcPreparedStatement;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
 import org.h2.util.IntArray;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.isCollocated;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.AVG;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.CAST;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.COUNT;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.SUM;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlPlaceholder.EMPTY;
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.prepared;
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.query;
+import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.toArray;
 
 /**
  * Splits a single SQL query into two step map-reduce query.
@@ -136,26 +145,146 @@ public class GridSqlQuerySplitter {
     /**
      * @param stmt Prepared statement.
      * @param params Parameters.
-     * @param collocated Collocated query.
-     * @param igniteH2Indexing Indexing implementation.
+     * @param collocatedGrpBy Whether the query has collocated GROUP BY keys.
+     * @param distributedJoins If distributed joins enabled.
      * @return Two step query.
      */
-    public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, Object[] params, boolean collocated, IgniteH2Indexing igniteH2Indexing) {
+    public static GridCacheTwoStepQuery split(
+        JdbcPreparedStatement stmt,
+        Object[] params,
+        final boolean collocatedGrpBy,
+        final boolean distributedJoins
+    ) {
         if (params == null)
             params = GridCacheSqlQuery.EMPTY_PARAMS;
 
+        Set<String> tbls = new HashSet<>();
         Set<String> schemas = new HashSet<>();
 
+        final Prepared prepared = prepared(stmt);
+
+        GridSqlQuery qry = new GridSqlQueryParser().parse(prepared);
+
+        qry = collectAllTables(qry, schemas, tbls);
+
+        // Build resulting two step query.
+        GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(schemas, tbls);
+
         // Map query will be direct reference to the original query AST.
         // Thus all the modifications will be performed on the original AST, so we should be careful when
         // nullifying or updating things, have to make sure that we will not need them in the original form later.
-        final GridSqlSelect mapQry = wrapUnion(collectAllSpaces(GridSqlQueryParser.parse(stmt), schemas));
+        final GridSqlSelect mapQry = wrapUnion(qry);
+
+        GridCacheSqlQuery rdc = split(res, 0, mapQry, params, collocatedGrpBy);
+
+        res.reduceQuery(rdc);
+
+        // We do not have to look at each map query separately here, because if
+        // the whole initial query is collocated, then all the map sub-queries
+        // will be collocated as well.
+        res.distributedJoins(distributedJoins && !isCollocated(query(prepared)));
+
+        return res;
+    }
+
+    /**
+     * @param el Either {@link GridSqlSelect#from()} or {@link GridSqlSelect#where()} elements.
+     */
+    private static void findAffinityColumnConditions(GridSqlElement el) {
+        if (el == null)
+            return;
+
+        el = GridSqlAlias.unwrap(el);
+
+        if (el instanceof GridSqlJoin) {
+            GridSqlJoin join = (GridSqlJoin)el;
+
+            findAffinityColumnConditions(join.leftTable());
+            findAffinityColumnConditions(join.rightTable());
+            findAffinityColumnConditions(join.on());
+        }
+        else if (el instanceof GridSqlOperation) {
+            GridSqlOperationType type = ((GridSqlOperation)el).operationType();
+
+            switch(type) {
+                case AND:
+                    findAffinityColumnConditions(el.child(0));
+                    findAffinityColumnConditions(el.child(1));
+
+                    break;
+
+                case EQUAL:
+                    findAffinityColumn(el.child(0));
+                    findAffinityColumn(el.child(1));
+            }
+        }
+    }
+
+    /**
+     * @param exp Possible affinity column expression.
+     */
+    private static void findAffinityColumn(GridSqlElement exp) {
+        if (exp instanceof GridSqlColumn) {
+            GridSqlColumn col = (GridSqlColumn)exp;
+
+            GridSqlElement from = col.expressionInFrom();
+
+            if (from instanceof GridSqlTable) {
+                GridSqlTable fromTbl = (GridSqlTable)from;
+
+                GridH2Table tbl = fromTbl.dataTable();
+
+                if (tbl != null) {
+                    IndexColumn affKeyCol = tbl.getAffinityKeyColumn();
+                    Column expCol = col.column();
+
+                    if (affKeyCol != null && expCol != null &&
+                        affKeyCol.column.getColumnId() == expCol.getColumnId()) {
+                        // Mark that table lookup will use affinity key.
+                        fromTbl.affinityKeyCondition(true);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @param qry Select.
+     * @return {@code true} If there is at least one partitioned table in FROM clause.
+     */
+    private static boolean hasPartitionedTableInFrom(GridSqlSelect qry) {
+        return findTablesInFrom(qry.from(), new IgnitePredicate<GridSqlElement>() {
+            @Override public boolean apply(GridSqlElement el) {
+                if (el instanceof GridSqlTable) {
+                    GridH2Table tbl = ((GridSqlTable)el).dataTable();
+
+                    assert tbl != null : el;
+
+                    GridCacheContext<?,?> cctx = tbl.rowDescriptor().context();
 
+                    return !cctx.isLocal() && !cctx.isReplicated();
+                }
+
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @param res Resulting two step query.
+     * @param splitIdx Split index.
+     * @param mapQry Map query to be split.
+     * @param params Query parameters.
+     * @param collocatedGroupBy Whether the query has collocated GROUP BY keys.
+     * @return Reduce query for the given map query.
+     */
+    private static GridCacheSqlQuery split(GridCacheTwoStepQuery res, int splitIdx, final GridSqlSelect mapQry,
+        Object[] params, boolean collocatedGroupBy) {
         final boolean explain = mapQry.explain();
 
         mapQry.explain(false);
 
-        GridSqlSelect rdcQry = new GridSqlSelect().from(table(0));
+        GridSqlSelect rdcQry = new GridSqlSelect().from(table(splitIdx));
 
         // Split all select expressions into map-reduce parts.
         List<GridSqlElement> mapExps = new ArrayList<>(mapQry.allColumns());
@@ -172,9 +301,9 @@ public class GridSqlQuerySplitter {
         boolean aggregateFound = false;
 
         for (int i = 0, len = mapExps.size(); i < len; i++) // Remember len because mapExps list can grow.
-            aggregateFound |= splitSelectExpression(mapExps, rdcExps, colNames, i, collocated, i == havingCol);
+            aggregateFound |= splitSelectExpression(mapExps, rdcExps, colNames, i, collocatedGroupBy, i == havingCol);
 
-        // Fill select expressions.
+        // -- SELECT
         mapQry.clearColumns();
 
         for (GridSqlElement exp : mapExps) // Add all map expressions as visible.
@@ -189,12 +318,18 @@ public class GridSqlQuerySplitter {
         for (int i = rdcExps.size(); i < mapExps.size(); i++)  // Add all extra map columns as invisible reduce columns.
             rdcQry.addColumn(column(((GridSqlAlias)mapExps.get(i)).alias()), false);
 
+        // -- FROM
+        findAffinityColumnConditions(mapQry.from());
+
+        // -- WHERE
+        findAffinityColumnConditions(mapQry.where());
+
         // -- GROUP BY
-        if (mapQry.groupColumns() != null && !collocated)
+        if (mapQry.groupColumns() != null && !collocatedGroupBy)
             rdcQry.groupColumns(mapQry.groupColumns());
 
         // -- HAVING
-        if (havingCol >= 0 && !collocated) {
+        if (havingCol >= 0 && !collocatedGroupBy) {
             // TODO IGNITE-1140 - Find aggregate functions in HAVING clause or rewrite query to put all aggregates to SELECT clause.
             // We need to find HAVING column in reduce query.
             for (int i = visibleCols; i < rdcQry.allColumns(); i++) {
@@ -246,40 +381,25 @@ public class GridSqlQuerySplitter {
 
         IntArray paramIdxs = new IntArray(params.length);
 
-        GridCacheSqlQuery rdc = new GridCacheSqlQuery(rdcQry.getSQL(),
-            findParams(rdcQry, params, new ArrayList<>(), paramIdxs).toArray());
-
-        rdc.parameterIndexes(toIntArray(paramIdxs));
-
-        paramIdxs = new IntArray(params.length);
-
         GridCacheSqlQuery map = new GridCacheSqlQuery(mapQry.getSQL(),
-            findParams(mapQry, params, new ArrayList<>(params.length), paramIdxs).toArray())
-            .columns(collectColumns(mapExps));
+            findParams(mapQry, params, new ArrayList<>(params.length), paramIdxs).toArray());
 
-        map.parameterIndexes(toIntArray(paramIdxs));
+        map.columns(collectColumns(mapExps));
+        map.parameterIndexes(toArray(paramIdxs));
 
-        Set<String> spaces = new HashSet<>(schemas.size());
+        res.addMapQuery(map);
 
-        for (String schema : schemas)
-            spaces.add(igniteH2Indexing.space(schema));
+        res.explain(explain);
 
-        // Build resulting two step query.
-        GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(spaces, rdc, rdcQry.simpleQuery()).addMapQuery(map);
+        paramIdxs = new IntArray(params.length);
 
-        res.explain(explain);
+        GridCacheSqlQuery rdc = new GridCacheSqlQuery(rdcQry.getSQL(),
+            findParams(rdcQry, params, new ArrayList<>(), paramIdxs).toArray());
 
-        return res;
-    }
+        rdc.parameterIndexes(toArray(paramIdxs));
+        res.skipMergeTable(rdcQry.simpleQuery());
 
-    /**
-     * @param arr Integer array.
-     * @return Primitive int array.
-     */
-    private static int[] toIntArray(IntArray arr) {
-        int[] res = new int[arr.size()];
-        arr.toArray(res);
-        return res;
+        return rdc;
     }
 
     /**
@@ -315,25 +435,26 @@ public class GridSqlQuerySplitter {
 
     /**
      * @param qry Query.
-     * @param schemas Shemas' names.
+     * @param schemas Schema names.
+     * @param tbls Tables.
      * @return Query.
      */
-    private static GridSqlQuery collectAllSpaces(GridSqlQuery qry, Set<String> schemas) {
+    private static GridSqlQuery collectAllTables(GridSqlQuery qry, Set<String> schemas, Set<String> tbls) {
         if (qry instanceof GridSqlUnion) {
             GridSqlUnion union = (GridSqlUnion)qry;
 
-            collectAllSpaces(union.left(), schemas);
-            collectAllSpaces(union.right(), schemas);
+            collectAllTables(union.left(), schemas, tbls);
+            collectAllTables(union.right(), schemas, tbls);
         }
         else {
             GridSqlSelect select = (GridSqlSelect)qry;
 
-            collectAllSpacesInFrom(select.from(), schemas);
+            collectAllTablesInFrom(select.from(), schemas, tbls);
 
             for (GridSqlElement el : select.columns(false))
-                collectAllSpacesInSubqueries(el, schemas);
+                collectAllTablesInSubqueries(el, schemas, tbls);
 
-            collectAllSpacesInSubqueries(select.where(), schemas);
+            collectAllTablesInSubqueries(select.where(), schemas, tbls);
         }
 
         return qry;
@@ -341,45 +462,85 @@ public class GridSqlQuerySplitter {
 
     /**
      * @param from From element.
-     * @param schemas Shemas' names.
+     * @param schemas Schema names.
+     * @param tbls Tables.
      */
-    private static void collectAllSpacesInFrom(GridSqlElement from, Set<String> schemas) {
-        assert from != null;
+    private static void collectAllTablesInFrom(GridSqlElement from, final Set<String> schemas, final Set<String> tbls) {
+        findTablesInFrom(from, new IgnitePredicate<GridSqlElement>() {
+            @Override public boolean apply(GridSqlElement el) {
+                if (el instanceof GridSqlTable) {
+                    GridSqlTable tbl = (GridSqlTable)el;
+
+                    String schema = tbl.schema();
+
+                    boolean addSchema = tbls == null;
+
+                    if (tbls != null)
+                        addSchema = tbls.add(tbl.dataTable().identifier());
+
+                    if (addSchema && schema != null && schemas != null)
+                        schemas.add(schema);
+                }
+                else if (el instanceof GridSqlSubquery)
+                    collectAllTables(((GridSqlSubquery)el).select(), schemas, tbls);
+
+                return false;
+            }
+        });
+    }
+
+    /**
+     * Processes all the tables and subqueries using the given closure.
+     *
+     * @param from FROM element.
+     * @param c Closure each found table and subquery will be passed to. If returns {@code true} the we need to stop.
+     * @return {@code true} If we have found.
+     */
+    private static boolean findTablesInFrom(GridSqlElement from, IgnitePredicate<GridSqlElement> c) {
+        if (from == null)
+            return false;
+
+        if (from instanceof GridSqlTable || from instanceof GridSqlSubquery)
+            return c.apply(from);
 
         if (from instanceof GridSqlJoin) {
             // Left and right.
-            collectAllSpacesInFrom(from.child(0), schemas);
-            collectAllSpacesInFrom(from.child(1), schemas);
-        }
-        else if (from instanceof GridSqlTable) {
-            String schema = ((GridSqlTable)from).schema();
+            if (findTablesInFrom(from.child(0), c))
+                return true;
 
-            if (schema != null)
-                schemas.add(schema);
+            if (findTablesInFrom(from.child(1), c))
+                return true;
+
+            // We don't process ON condition because it is not a joining part of from here.
+            return false;
         }
-        else if (from instanceof GridSqlSubquery)
-            collectAllSpaces(((GridSqlSubquery)from).select(), schemas);
         else if (from instanceof GridSqlAlias)
-            collectAllSpacesInFrom(from.child(), schemas);
-        else if (!(from instanceof GridSqlFunction))
-            throw new IllegalStateException(from.getClass().getName() + " : " + from.getSQL());
+            return findTablesInFrom(from.child(), c);
+        else if (from instanceof GridSqlFunction)
+            return false;
+
+        throw new IllegalStateException(from.getClass().getName() + " : " + from.getSQL());
     }
 
     /**
-     * Searches spaces in subqueries in SELECT and WHERE clauses.
+     * Searches schema names and tables in subqueries in SELECT and WHERE clauses.
+     *
      * @param el Element.
-     * @param schemas Schemas' names.
+     * @param schemas Schema names.
+     * @param tbls Tables.
      */
-    private static void collectAllSpacesInSubqueries(GridSqlElement el, Set<String> schemas) {
-        if (el instanceof GridSqlAlias)
-            el = el.child();
+    private static void collectAllTablesInSubqueries(GridSqlElement el, Set<String> schemas, Set<String> tbls) {
+        if (el == null)
+            return;
+
+        el = GridSqlAlias.unwrap(el);
 
         if (el instanceof GridSqlOperation || el instanceof GridSqlFunction) {
             for (GridSqlElement child : el)
-                collectAllSpacesInSubqueries(child, schemas);
+                collectAllTablesInSubqueries(child, schemas, tbls);
         }
         else if (el instanceof GridSqlSubquery)
-            collectAllSpaces(((GridSqlSubquery)el).select(), schemas);
+            collectAllTables(((GridSqlSubquery)el).select(), schemas, tbls);
     }
 
     /**
@@ -691,7 +852,7 @@ public class GridSqlQuerySplitter {
      * @return Column.
      */
     private static GridSqlColumn column(String name) {
-        return new GridSqlColumn(null, name, name);
+        return new GridSqlColumn(null, null, name, name);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
index 07ef1fa..a38ae68 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSelect.java
@@ -131,13 +131,12 @@ public class GridSqlSelect extends GridSqlQuery {
 
     /**
      * @param buff Statement builder.
-     * @param expression Alias expression.
+     * @param exp Alias expression.
      */
-    private static void addAlias(StatementBuilder buff, GridSqlElement expression) {
-        if (expression instanceof GridSqlAlias)
-            expression = expression.child();
+    private static void addAlias(StatementBuilder buff, GridSqlElement exp) {
+        exp = GridSqlAlias.unwrap(exp);
 
-        buff.append(StringUtils.unEnclose(expression.getSQL()));
+        buff.append(StringUtils.unEnclose(exp.getSQL()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
index 0bcdf1c..49c679d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.processors.query.h2.sql;
 
 import java.util.Collections;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.h2.command.Parser;
+import org.h2.table.Table;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -31,15 +33,56 @@ public class GridSqlTable extends GridSqlElement {
     /** */
     private final String tblName;
 
+    /** */
+    private final GridH2Table tbl;
+
+    /** */
+    private boolean affKeyCond;
+
     /**
      * @param schema Schema.
      * @param tblName Table name.
      */
     public GridSqlTable(@Nullable String schema, String tblName) {
+        this(schema, tblName, null);
+    }
+
+    /**
+     * @param tbl Table.
+     */
+    public GridSqlTable(Table tbl) {
+        this(tbl.getSchema().getName(), tbl.getName(), tbl);
+    }
+
+    /**
+     * @param schema Schema.
+     * @param tblName Table name.
+     * @param tbl H2 Table.
+     */
+    private GridSqlTable(@Nullable String schema, String tblName, @Nullable Table tbl) {
         super(Collections.<GridSqlElement>emptyList());
 
+        assert schema != null : "schema";
+        assert tblName != null : "tblName";
+
         this.schema = schema;
         this.tblName = tblName;
+
+        this.tbl = tbl instanceof GridH2Table ? (GridH2Table)tbl : null;
+    }
+
+    /**
+     * @param affKeyCond If affinity key condition is found.
+     */
+    public void affinityKeyCondition(boolean affKeyCond) {
+        this.affKeyCond = affKeyCond;
+    }
+
+    /**
+     * @return {@code true} If affinity key condition is found.
+     */
+    public boolean affinityKeyCondition() {
+        return affKeyCond;
     }
 
     /** {@inheritDoc} */
@@ -63,4 +106,31 @@ public class GridSqlTable extends GridSqlElement {
     public String tableName() {
         return tblName;
     }
+
+    /**
+     * @return Referenced data table.
+     */
+    public GridH2Table dataTable() {
+        return tbl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (!super.equals(o))
+            return false;
+
+        GridSqlTable that = (GridSqlTable)o;
+
+        return schema.equals(that.schema) && tblName.equals(that.tblName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int result = 1;
+
+        result = 31 * result + schema.hashCode();
+        result = 31 * result + tblName.hashCode();
+
+        return result;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 580058c..bb5e419 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -18,15 +18,16 @@
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import java.lang.reflect.Field;
+import java.sql.Connection;
 import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.AbstractCollection;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -45,38 +46,46 @@ import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.jdbc.JdbcResultSet;
 import org.h2.result.ResultInterface;
 import org.h2.value.Value;
-import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
 import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
+import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
 
 /**
  * Map query executor.
@@ -109,7 +118,7 @@ public class GridMapQueryExecutor {
     private IgniteH2Indexing h2;
 
     /** */
-    private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = new ConcurrentHashMap8<>();
+    private ConcurrentMap<UUID, NodeResults> qryRess = new ConcurrentHashMap8<>();
 
     /** */
     private final GridSpinBusyLock busyLock;
@@ -136,16 +145,20 @@ public class GridMapQueryExecutor {
 
         log = ctx.log(GridMapQueryExecutor.class);
 
+        final UUID locNodeId = ctx.localNodeId();
+
         ctx.event().addLocalEventListener(new GridLocalEventListener() {
             @Override public void onEvent(final Event evt) {
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
-                ConcurrentMap<Long,QueryResults> nodeRess = qryRess.remove(nodeId);
+                GridH2QueryContext.clearAfterDeadNode(locNodeId, nodeId);
+
+                NodeResults nodeRess = qryRess.remove(nodeId);
 
                 if (nodeRess == null)
                     return;
 
-                for (QueryResults ress : nodeRess.values())
+                for (QueryResults ress : nodeRess.results().values())
                     ress.cancel();
             }
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
@@ -156,6 +169,9 @@ public class GridMapQueryExecutor {
                     return;
 
                 try {
+                    if (msg instanceof GridCacheQueryMarshallable)
+                        ((GridCacheQueryMarshallable)msg).unmarshall(ctx.config().getMarshaller(), ctx);
+
                     GridMapQueryExecutor.this.onMessage(nodeId, msg);
                 }
                 finally {
@@ -180,12 +196,14 @@ public class GridMapQueryExecutor {
 
             boolean processed = true;
 
-            if (msg instanceof GridQueryRequest)
-                onQueryRequest(node, (GridQueryRequest)msg);
+            if (msg instanceof GridH2QueryRequest)
+                onQueryRequest(node, (GridH2QueryRequest)msg);
             else if (msg instanceof GridQueryNextPageRequest)
                 onNextPageRequest(node, (GridQueryNextPageRequest)msg);
             else if (msg instanceof GridQueryCancelRequest)
                 onCancel(node, (GridQueryCancelRequest)msg);
+            else if (msg instanceof GridQueryRequest)
+                onQueryRequest(node, (GridQueryRequest)msg);
             else
                 processed = false;
 
@@ -202,9 +220,19 @@ public class GridMapQueryExecutor {
      * @param msg Message.
      */
     private void onCancel(ClusterNode node, GridQueryCancelRequest msg) {
-        ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
+        long qryReqId = msg.queryRequestId();
+
+        NodeResults nodeRess = resultsForNode(node.id());
+
+        boolean clear = GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
 
-        QueryResults results = nodeRess.remove(msg.queryRequestId());
+        if (!clear) {
+            nodeRess.onCancel(qryReqId);
+
+            GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
+        }
+
+        QueryResults results = nodeRess.results().remove(qryReqId);
 
         if (results == null)
             return;
@@ -216,13 +244,13 @@ public class GridMapQueryExecutor {
      * @param nodeId Node ID.
      * @return Results for node.
      */
-    private ConcurrentMap<Long, QueryResults> resultsForNode(UUID nodeId) {
-        ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(nodeId);
+    private NodeResults resultsForNode(UUID nodeId) {
+        NodeResults nodeRess = qryRess.get(nodeId);
 
         if (nodeRess == null) {
-            nodeRess = new ConcurrentHashMap8<>();
+            nodeRess = new NodeResults();
 
-            ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(nodeId, nodeRess);
+            NodeResults old = qryRess.putIfAbsent(nodeId, nodeRess);
 
             if (old != null)
                 nodeRess = old;
@@ -232,19 +260,6 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * @param cacheName Cache name.
-     * @return Cache context or {@code null} if none.
-     */
-    @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) {
-        GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName);
-
-        if (cache == null)
-            return null;
-
-        return cache.context();
-    }
-
-    /**
      * @param cctx Cache context.
      * @param p Partition ID.
      * @return Partition.
@@ -254,7 +269,7 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * @param cacheNames Cache names.
+     * @param cacheIds Cache IDs.
      * @param topVer Topology version.
      * @param explicitParts Explicit partitions list.
      * @param reserved Reserved list.
@@ -262,7 +277,7 @@ public class GridMapQueryExecutor {
      * @throws IgniteCheckedException If failed.
      */
     private boolean reservePartitions(
-        Collection<String> cacheNames,
+        List<Integer> cacheIds,
         AffinityTopologyVersion topVer,
         final int[] explicitParts,
         List<GridReservable> reserved
@@ -271,8 +286,8 @@ public class GridMapQueryExecutor {
 
         Collection<Integer> partIds = wrap(explicitParts);
 
-        for (String cacheName : cacheNames) {
-            GridCacheContext<?, ?> cctx = cacheContext(cacheName);
+        for (int i = 0; i < cacheIds.size(); i++) {
+            GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds.get(i));
 
             if (cctx == null) // Cache was not found, probably was not deployed yet.
                 return false;
@@ -394,121 +409,226 @@ public class GridMapQueryExecutor {
      * @param req Query request.
      */
     private void onQueryRequest(ClusterNode node, GridQueryRequest req) {
-        ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
+        List<Integer> cacheIds;
 
-        QueryResults qr = null;
+        if (req.extraSpaces() != null) {
+            cacheIds = new ArrayList<>(req.extraSpaces().size() + 1);
 
-        List<GridReservable> reserved = new ArrayList<>();
+            cacheIds.add(CU.cacheId(req.space()));
 
-        try {
-            // Unmarshall query params.
-            Collection<GridCacheSqlQuery> qrys;
+            for (String extraSpace : req.extraSpaces())
+                cacheIds.add(CU.cacheId(extraSpace));
+        }
+        else
+            cacheIds = Collections.singletonList(CU.cacheId(req.space()));
+
+        onQueryRequest0(node,
+            req.requestId(),
+            req.queries(),
+            cacheIds,
+            req.topologyVersion(),
+            null,
+            req.partitions(),
+            null,
+            req.pageSize(),
+            false);
+    }
 
-            try {
-                qrys = req.queries();
+    /**
+     * @param node Node.
+     * @param req Query request.
+     */
+    private void onQueryRequest(ClusterNode node, GridH2QueryRequest req) {
+        Map<UUID,int[]> partsMap = req.partitions();
+        int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+
+        onQueryRequest0(node,
+            req.requestId(),
+            req.queries(),
+            req.caches(),
+            req.topologyVersion(),
+            partsMap,
+            parts,
+            req.tables(),
+            req.pageSize(),
+            req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
+    }
+
+    /**
+     * @param node Node authored request.
+     * @param reqId Request ID.
+     * @param qrys Queries to execute.
+     * @param cacheIds Caches which will be affected by these queries.
+     * @param topVer Topology version.
+     * @param partsMap Partitions map for unstable topology.
+     * @param parts Explicit partitions for current node.
+     * @param tbls Tables.
+     * @param pageSize Page size.
+     * @param distributedJoins Can we expect distributed joins to be ran.
+     */
+    private void onQueryRequest0(
+        ClusterNode node,
+        long reqId,
+        Collection<GridCacheSqlQuery> qrys,
+        List<Integer> cacheIds,
+        AffinityTopologyVersion topVer,
+        Map<UUID, int[]> partsMap,
+        int[] parts,
+        Collection<String> tbls,
+        int pageSize,
+        boolean distributedJoins
+    ) {
+        // Prepare to run queries.
+        GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0));
 
-                if (!node.isLocal()) {
-                    Marshaller m = ctx.config().getMarshaller();
+        if (mainCctx == null)
+            throw new CacheException("Failed to find cache.");
 
-                    for (GridCacheSqlQuery qry : qrys)
-                        qry.unmarshallParams(m, ctx);
-                }
-            }
-            catch (IgniteCheckedException e) {
-                throw new CacheException("Failed to unmarshall parameters.", e);
-            }
+        NodeResults nodeRess = resultsForNode(node.id());
 
-            List<String> caches = (List<String>)F.concat(true, req.space(), req.extraSpaces());
+        QueryResults qr = null;
 
-            // Topology version can be null in rolling restart with previous version!
-            final AffinityTopologyVersion topVer = req.topologyVersion();
+        List<GridReservable> reserved = new ArrayList<>();
 
+        try {
             if (topVer != null) {
                 // Reserve primary for topology version or explicit partitions.
-                if (!reservePartitions(caches, topVer, req.partitions(), reserved)) {
-                    sendRetry(node, req.requestId());
+                if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
+                    sendRetry(node, reqId);
 
                     return;
                 }
             }
 
-            // Prepare to run queries.
-            GridCacheContext<?,?> mainCctx = cacheContext(req.space());
+            qr = new QueryResults(reqId, qrys.size(), mainCctx);
 
-            if (mainCctx == null)
-                throw new CacheException("Failed to find cache: " + req.space());
+            if (nodeRess.results().put(reqId, qr) != null)
+                throw new IllegalStateException();
 
-            qr = new QueryResults(req.requestId(), qrys.size(), mainCctx);
+            // Prepare query context.
+            GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
+                node.id(),
+                reqId,
+                mainCctx.isReplicated() ? REPLICATED : MAP)
+                .filter(h2.backupFilter(topVer, parts))
+                .partitionsMap(partsMap)
+                .distributedJoins(distributedJoins)
+                .pageSize(pageSize)
+                .topologyVersion(topVer)
+                .reservations(reserved);
 
-            if (nodeRess.put(req.requestId(), qr) != null)
-                throw new IllegalStateException();
+            List<GridH2Table> snapshotedTbls = null;
 
-            h2.setFilters(h2.backupFilter(caches, topVer, req.partitions()));
+            if (!F.isEmpty(tbls)) {
+                snapshotedTbls = new ArrayList<>(tbls.size());
 
-            // TODO Prepare snapshots for all the needed tables before the run.
+                for (String identifier : tbls) {
+                    GridH2Table tbl = h2.dataTable(identifier);
 
-            // Run queries.
-            int i = 0;
+                    Objects.requireNonNull(tbl, identifier);
 
-            for (GridCacheSqlQuery qry : qrys) {
-                ResultSet rs = h2.executeSqlQueryWithTimer(req.space(),
-                    h2.connectionForSpace(req.space()),
-                    qry.query(),
-                    F.asList(qry.parameters()),
-                    true);
+                    tbl.snapshotIndexes(qctx);
 
-                if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                    ctx.event().record(new CacheQueryExecutedEvent<>(
-                        node,
-                        "SQL query executed.",
-                        EVT_CACHE_QUERY_EXECUTED,
-                        CacheQueryType.SQL.name(),
-                        mainCctx.namex(),
-                        null,
-                        qry.query(),
-                        null,
-                        null,
-                        qry.parameters(),
-                        node.id(),
-                        null));
+                    snapshotedTbls.add(tbl);
                 }
+            }
+
+            Connection conn = h2.connectionForSpace(mainCctx.name());
+
+            // Here we enforce join order to have the same behavior on all the nodes.
+            h2.setupConnection(conn, distributedJoins, true);
+
+            GridH2QueryContext.set(qctx);
 
-                assert rs instanceof JdbcResultSet : rs.getClass();
+            // qctx is set, we have to release reservations inside of it.
+            reserved = null;
 
-                qr.addResult(i, qry, node.id(), rs);
+            try {
+                if (nodeRess.cancelled(reqId)) {
+                    GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
 
-                if (qr.canceled) {
-                    qr.result(i).close();
+                    nodeRess.results().remove(reqId);
 
                     return;
                 }
 
-                // Send the first page.
-                sendNextPage(nodeRess, node, qr, i, req.pageSize());
+                // Run queries.
+                int i = 0;
+
+                boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+
+                for (GridCacheSqlQuery qry : qrys) {
+                    ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
+                        F.asList(qry.parameters()), true);
+
+                    if (evt) {
+                        ctx.event().record(new CacheQueryExecutedEvent<>(
+                            node,
+                            "SQL query executed.",
+                            EVT_CACHE_QUERY_EXECUTED,
+                            CacheQueryType.SQL.name(),
+                            mainCctx.namex(),
+                            null,
+                            qry.query(),
+                            null,
+                            null,
+                            qry.parameters(),
+                            node.id(),
+                            null));
+                    }
+
+                    assert rs instanceof JdbcResultSet : rs.getClass();
+
+                    qr.addResult(i, qry, node.id(), rs);
+
+                    if (qr.canceled) {
+                        qr.result(i).close();
+
+                        return;
+                    }
+
+                    // Send the first page.
+                    sendNextPage(nodeRess, node, qr, i, pageSize);
+
+                    i++;
+                }
+            }
+            finally {
+                GridH2QueryContext.clearThreadLocal();
+
+                if (!distributedJoins)
+                    qctx.clearContext(false);
 
-                i++;
+                if (!F.isEmpty(snapshotedTbls)) {
+                    for (GridH2Table dataTbl : snapshotedTbls)
+                        dataTbl.releaseSnapshots();
+                }
             }
         }
         catch (Throwable e) {
             if (qr != null) {
-                nodeRess.remove(req.requestId(), qr);
+                nodeRess.results().remove(reqId, qr);
 
                 qr.cancel();
             }
 
-            U.error(log, "Failed to execute local query: " + req, e);
+            if (X.hasCause(e, GridH2RetryException.class))
+                sendRetry(node, reqId);
+            else {
+                U.error(log, "Failed to execute local query.", e);
 
-            sendError(node, req.requestId(), e);
+                sendError(node, reqId, e);
 
-            if (e instanceof Error)
-                throw (Error)e;
+                if (e instanceof Error)
+                    throw (Error)e;
+            }
         }
         finally {
-            h2.setFilters(null);
-
-            // Release reserved partitions.
-            for (GridReservable r : reserved)
-                r.release();
+            if (reserved != null) {
+                // Release reserved partitions.
+                for (int i = 0; i < reserved.size(); i++)
+                    reserved.get(i).release();
+            }
         }
     }
 
@@ -521,8 +641,11 @@ public class GridMapQueryExecutor {
         try {
             GridQueryFailResponse msg = new GridQueryFailResponse(qryReqId, err);
 
-            if (node.isLocal())
+            if (node.isLocal()) {
+                U.error(log, "Failed to run map query on local node.", err);
+
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+            }
             else
                 ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
         }
@@ -538,9 +661,9 @@ public class GridMapQueryExecutor {
      * @param req Request.
      */
     private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) {
-        ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
+        NodeResults nodeRess = qryRess.get(node.id());
 
-        QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId());
+        QueryResults qr = nodeRess == null ? null : nodeRess.results().get(req.queryRequestId());
 
         if (qr == null || qr.canceled)
             sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
@@ -549,12 +672,13 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param nodeRess Results.
      * @param node Node.
      * @param qr Query results.
      * @param qry Query.
      * @param pageSize Page size.
      */
-    private void sendNextPage(ConcurrentMap<Long, QueryResults> nodeRess, ClusterNode node, QueryResults qr, int qry,
+    private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry,
         int pageSize) {
         QueryResult res = qr.result(qry);
 
@@ -570,14 +694,14 @@ public class GridMapQueryExecutor {
             res.close();
 
             if (qr.isAllClosed())
-                nodeRess.remove(qr.qryReqId, qr);
+                nodeRess.results().remove(qr.qryReqId, qr);
         }
 
         try {
             boolean loc = node.isLocal();
 
             GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page,
-                page == 0 ? res.rowCount : -1 ,
+                page == 0 ? res.rowCnt : -1 ,
                 res.cols,
                 loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)),
                 loc ? rows : null);
@@ -597,22 +721,26 @@ public class GridMapQueryExecutor {
     /**
      * @param node Node.
      * @param reqId Request ID.
-     * @throws IgniteCheckedException If failed.
      */
-    private void sendRetry(ClusterNode node, long reqId) throws IgniteCheckedException {
-        boolean loc = node.isLocal();
+    private void sendRetry(ClusterNode node, long reqId) {
+        try {
+            boolean loc = node.isLocal();
 
-        GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
+            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
             /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
-            loc ? null : Collections.<Message>emptyList(),
-            loc ? Collections.<Value[]>emptyList() : null);
+                loc ? null : Collections.<Message>emptyList(),
+                loc ? Collections.<Value[]>emptyList() : null);
 
-        msg.retry(h2.readyTopologyVersion());
+            msg.retry(h2.readyTopologyVersion());
 
-        if (loc)
-            h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
-        else
-            ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+            if (loc)
+                h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+            else
+                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+        }
+        catch (Exception e) {
+            U.warn(log, "Failed to send retry message: " + e.getMessage());
+        }
     }
 
     /**
@@ -626,6 +754,44 @@ public class GridMapQueryExecutor {
         }
     }
 
+
+    /**
+     *
+     */
+    private static class NodeResults {
+        /** */
+        private final ConcurrentMap<Long, QueryResults> res = new ConcurrentHashMap8<>();
+
+        /** */
+        private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
+            new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
+
+        /**
+         * @return All results.
+         */
+        ConcurrentMap<Long, QueryResults> results() {
+            return res;
+        }
+
+        /**
+         * @param qryId Query ID.
+         * @return {@code False} if query was already cancelled.
+         */
+        boolean cancelled(long qryId) {
+            return qryHist.get(qryId) != null;
+        }
+
+        /**
+         * @param qryId Query ID.
+         * @return {@code True} if cancelled.
+         */
+        boolean onCancel(long qryId) {
+            Boolean old = qryHist.putIfAbsent(qryId, Boolean.FALSE);
+
+            return old == null;
+        }
+    }
+
     /**
      *
      */
@@ -637,7 +803,7 @@ public class GridMapQueryExecutor {
         private final AtomicReferenceArray<QueryResult> results;
 
         /** */
-        private final GridCacheContext<?,?> cctx;
+        private final GridCacheContext<?, ?> cctx;
 
         /** */
         private volatile boolean canceled;
@@ -687,6 +853,9 @@ public class GridMapQueryExecutor {
             return true;
         }
 
+        /**
+         *
+         */
         void cancel() {
             if (canceled)
                 return;
@@ -728,7 +897,7 @@ public class GridMapQueryExecutor {
         private int page;
 
         /** */
-        private final int rowCount;
+        private final int rowCnt;
 
         /** */
         private volatile boolean closed;
@@ -752,7 +921,7 @@ public class GridMapQueryExecutor {
                 throw new IllegalStateException(e); // Must not happen.
             }
 
-            rowCount = res.getRowCount();
+            rowCnt = res.getRowCount();
             cols = res.getVisibleColumnCount();
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 3914bd7..796ea66 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -18,18 +18,22 @@
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.ConcurrentModificationException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
-import org.h2.engine.Constants;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
 import org.h2.index.Cursor;
@@ -41,7 +45,6 @@ import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
 import org.h2.table.TableFilter;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -57,7 +60,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     private final AtomicInteger expRowsCnt = new AtomicInteger(0);
 
     /** Remaining rows per source node ID. */
-    private final ConcurrentMap<UUID, Counter> remainingRows = new ConcurrentHashMap8<>();
+    private Map<UUID, Counter> remainingRows;
 
     /** */
     private final AtomicBoolean lastSubmitted = new AtomicBoolean();
@@ -136,11 +139,19 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
-     * @param nodeId Node ID.
+     * Set source nodes.
+     *
+     * @param nodes Nodes.
      */
-    public void addSource(UUID nodeId) {
-        if (remainingRows.put(nodeId, new Counter()) != null)
-            throw new IllegalStateException();
+    public void setSources(Collection<ClusterNode> nodes) {
+        assert remainingRows == null;
+
+        remainingRows = U.newHashMap(nodes.size());
+
+        for (ClusterNode node : nodes) {
+            if (remainingRows.put(node.id(), new Counter()) != null)
+                throw new IllegalStateException("Duplicate node id: " + node.id());
+        }
     }
 
     /**
@@ -283,8 +294,8 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public double getCost(Session ses, int[] masks, TableFilter filter, SortOrder sortOrder) {
-        return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
+    @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
+        return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true);
     }
 
     /** {@inheritDoc} */
@@ -318,51 +329,9 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
-     * Cursor over iterator.
-     */
-    protected class IteratorCursor implements Cursor {
-        /** */
-        protected Iterator<Row> iter;
-
-        /** */
-        protected Row cur;
-
-        /**
-         * @param iter Iterator.
-         */
-        public IteratorCursor(Iterator<Row> iter) {
-            assert iter != null;
-
-            this.iter = iter;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Row get() {
-            return cur;
-        }
-
-        /** {@inheritDoc} */
-        @Override public SearchRow getSearchRow() {
-            return get();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            cur = iter.hasNext() ? iter.next() : null;
-
-            return cur != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean previous() {
-            throw DbException.getUnsupportedException("previous");
-        }
-    }
-
-    /**
      * Fetching cursor.
      */
-    protected class FetchingCursor extends IteratorCursor {
+    protected class FetchingCursor extends GridH2Cursor {
         /** */
         private Iterator<Row> stream;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index 5639340..8a8577f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -25,6 +25,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import javax.cache.CacheException;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
 import org.h2.index.Cursor;
 import org.h2.index.IndexType;
 import org.h2.result.Row;
@@ -73,7 +75,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
 
     /** {@inheritDoc} */
     @Override protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) {
-        return new IteratorCursor(fetched.iterator());
+        return new GridH2Cursor(fetched.iterator());
     }
 
     /** {@inheritDoc} */
@@ -112,7 +114,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
             }
 
             @Override public Row next() {
-                return new Row(iter.next(), 0);
+                return GridH2RowFactory.create(iter.next());
             }
 
             @Override public void remove() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
index a86cbcd..1489021 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
@@ -50,8 +50,8 @@ public class GridMergeTable extends TableBase {
     }
 
     /** {@inheritDoc} */
-    @Override public void lock(Session session, boolean exclusive, boolean force) {
-        // No-op.
+    @Override public boolean lock(Session session, boolean exclusive, boolean force) {
+        return false;
     }
 
     /** {@inheritDoc} */


Mime
View raw message