ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [09/11] ignite git commit: ignite-1232 Distributed SQL joins implementation
Date Fri, 22 Jul 2016 14:08:48 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
new file mode 100644
index 0000000..0f76316
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
@@ -0,0 +1,783 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import javax.cache.CacheException;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.util.typedef.F;
+import org.h2.command.dml.Query;
+import org.h2.command.dml.Select;
+import org.h2.command.dml.SelectUnion;
+import org.h2.expression.Comparison;
+import org.h2.expression.Expression;
+import org.h2.expression.ExpressionColumn;
+import org.h2.index.IndexCondition;
+import org.h2.index.ViewIndex;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.SubQueryInfo;
+import org.h2.table.Table;
+import org.h2.table.TableFilter;
+import org.h2.table.TableView;
+
+/**
+ * Collocation model for a query.
+ */
+public final class GridH2CollocationModel {
+    /** */
+    public static final int MULTIPLIER_COLLOCATED = 1;
+
+    /** */
+    private static final int MULTIPLIER_UNICAST = 50;
+
+    /** */
+    private static final int MULTIPLIER_BROADCAST = 200;
+
+    /** */
+    private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000;
+
+    /** */
+    private final GridH2CollocationModel upper;
+
+    /** */
+    private final int filter;
+
+    /** */
+    private final boolean view;
+
+    /** */
+    private int multiplier;
+
+    /** */
+    private Type type;
+
+    /** */
+    private GridH2CollocationModel[] children;
+
+    /** */
+    private TableFilter[] childFilters;
+
+    /** */
+    private List<GridH2CollocationModel> unions;
+
+    /** */
+    private Select select;
+
+    /** */
+    private final boolean validate;
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     * @param view This model will be a subquery (or top level query) and must contain child filters.
+     * @param validate Query validation flag.
+     */
+    private GridH2CollocationModel(GridH2CollocationModel upper, int filter, boolean view, boolean validate) {
+        this.upper = upper;
+        this.filter = filter;
+        this.view = view;
+        this.validate = validate;
+    }
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     * @param unions Unions.
+     * @param view This model will be a subquery (or top level query) and must contain child filters.
+     * @param validate Query validation flag.
+     * @return Created child collocation model.
+     */
+    private static GridH2CollocationModel createChildModel(GridH2CollocationModel upper,
+        int filter,
+        List<GridH2CollocationModel> unions,
+        boolean view,
+        boolean validate) {
+        GridH2CollocationModel child = new GridH2CollocationModel(upper, filter, view, validate);
+
+        if (unions != null) {
+            // Bind created child to unions.
+            assert upper == null || upper.child(filter, false) != null || unions.isEmpty();
+
+            if (upper != null && unions.isEmpty()) {
+                assert upper.child(filter, false) == null;
+
+                upper.children[filter] = child;
+            }
+
+            unions.add(child);
+
+            child.unions = unions;
+        }
+        else if (upper != null) {
+            // Bind created child to upper model.
+            assert upper.child(filter, false) == null;
+
+            upper.children[filter] = child;
+        }
+
+        return child;
+    }
+
+    /**
+     * @param childFilters New child filters.
+     * @return {@code true} If child filters were updated.
+     */
+    private boolean childFilters(TableFilter[] childFilters) {
+        assert childFilters != null;
+        assert view;
+
+        Select select = childFilters[0].getSelect();
+
+        assert this.select == null || this.select == select;
+
+        if (this.select == null) {
+            this.select = select;
+
+            assert this.childFilters == null;
+        }
+        else if (Arrays.equals(this.childFilters, childFilters))
+            return false;
+
+        if (this.childFilters == null) {
+            // We have to clone because H2 reuses array and reorders elements.
+            this.childFilters = childFilters.clone();
+
+            children = new GridH2CollocationModel[childFilters.length];
+        }
+        else {
+            assert this.childFilters.length == childFilters.length;
+
+            // We have to copy because H2 reuses array and reorders elements.
+            System.arraycopy(childFilters, 0, this.childFilters, 0, childFilters.length);
+
+            Arrays.fill(children, null);
+        }
+
+        // Reset results.
+        type = null;
+        multiplier = 0;
+
+        return true;
+    }
+
+    /**
+     * @param i Index.
+     * @param f Table filter.
+     * @return {@code true} If the child is not a table or view.
+     */
+    private boolean isChildTableOrView(int i, TableFilter f) {
+        if (f == null)
+            f = childFilters[i];
+
+        Table t = f.getTable();
+
+        return t.isView() || t instanceof GridH2Table;
+    }
+
+    /**
+     * Do the needed calculations.
+     */
+    private void calculate() {
+        if (type != null)
+            return;
+
+        if (view) { // We are at (sub-)query model.
+            assert childFilters != null;
+
+            boolean collocated = true;
+            boolean partitioned = false;
+            int maxMultiplier = MULTIPLIER_COLLOCATED;
+
+            for (int i = 0; i < childFilters.length; i++) {
+                GridH2CollocationModel child = child(i, true);
+
+                Type t = child.type(true);
+
+                if (child.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+                    maxMultiplier = child.multiplier;
+
+                if (t.isPartitioned()) {
+                    partitioned = true;
+
+                    if (!t.isCollocated()) {
+                        collocated = false;
+
+                        int m = child.multiplier(true);
+
+                        if (m > maxMultiplier) {
+                            maxMultiplier = m;
+
+                            if (maxMultiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+                                break;
+                        }
+                    }
+                }
+            }
+
+            type = Type.of(partitioned, collocated);
+            multiplier = maxMultiplier;
+        }
+        else {
+            assert upper != null;
+            assert childFilters == null;
+
+            // We are at table instance.
+            GridH2Table tbl = (GridH2Table)upper.childFilters[filter].getTable();
+
+            // Only partitioned tables will do distributed joins.
+            if (!tbl.isPartitioned()) {
+                type = Type.REPLICATED;
+                multiplier = MULTIPLIER_COLLOCATED;
+
+                return;
+            }
+
+            // If we are the first partitioned table in a join, then we are "base" for all the rest partitioned tables
+            // which will need to get remote result (if there is no affinity condition). Since this query is broadcasted
+            // to all the affinity nodes the "base" does not need to get remote results.
+            if (!upper.findPartitionedTableBefore(filter)) {
+                type = Type.PARTITIONED_COLLOCATED;
+                multiplier = MULTIPLIER_COLLOCATED;
+            }
+            else {
+                // It is enough to make sure that our previous join by affinity key is collocated, then we are
+                // collocated. If we at least have affinity key condition, then we do unicast which is cheaper.
+                switch (upper.joinedWithCollocated(filter)) {
+                    case JOINED_WITH_COLLOCATED:
+                        type = Type.PARTITIONED_COLLOCATED;
+                        multiplier = MULTIPLIER_COLLOCATED;
+
+                        break;
+
+                    case HAS_AFFINITY_CONDITION:
+                        type = Type.PARTITIONED_NOT_COLLOCATED;
+                        multiplier = MULTIPLIER_UNICAST;
+
+                        break;
+
+                    case NONE:
+                        type = Type.PARTITIONED_NOT_COLLOCATED;
+                        multiplier = MULTIPLIER_BROADCAST;
+
+                        break;
+
+                    default:
+                        throw new IllegalStateException();
+                }
+            }
+
+            if (upper.previousReplicated(filter))
+                multiplier = MULTIPLIER_REPLICATED_NOT_LAST;
+        }
+    }
+
+    /**
+     * @param f Current filter.
+     * @return {@code true} If partitioned table was found.
+     */
+    private boolean findPartitionedTableBefore(int f) {
+        for (int i = 0; i < f; i++) {
+            GridH2CollocationModel child = child(i, true);
+
+            // The c can be null if it is not a GridH2Table and not a sub-query,
+            // it is a some kind of function table or anything else that considered replicated.
+            if (child != null && child.type(true).isPartitioned())
+                return true;
+        }
+
+        // We have to search globally in upper queries as well.
+        return upper != null && upper.findPartitionedTableBefore(filter);
+    }
+
+    /**
+     * @param f Current filter.
+     * @return {@code true} If previous table is REPLICATED.
+     */
+    private boolean previousReplicated(int f) {
+        if (f > 0 && child(f - 1, true).type(true) == Type.REPLICATED)
+            return true;
+
+        return upper != null && upper.previousReplicated(filter);
+    }
+
+    /**
+     * @param f Filter.
+     * @return Affinity join type.
+     */
+    private Affinity joinedWithCollocated(int f) {
+        TableFilter tf = childFilters[f];
+
+        GridH2Table tbl = (GridH2Table)tf.getTable();
+
+        if (validate) {
+            if (tbl.rowDescriptor().context().customAffinityMapper())
+                throw customAffinityError(tbl.spaceName());
+
+            if (F.isEmpty(tf.getIndexConditions())) {
+                throw new CacheException("Failed to prepare distributed join query: " +
+                    "join condition does not use index [joinedCache=" + tbl.spaceName() +
+                    ", plan=" + tf.getSelect().getPlanSQL() + ']');
+            }
+        }
+
+        IndexColumn affCol = tbl.getAffinityKeyColumn();
+
+        boolean affKeyCondFound = false;
+
+        if (affCol != null) {
+            ArrayList<IndexCondition> idxConditions = tf.getIndexConditions();
+
+            int affColId = affCol.column.getColumnId();
+
+            for (int i = 0; i < idxConditions.size(); i++) {
+                IndexCondition c = idxConditions.get(i);
+
+                int cmpType = c.getCompareType();
+
+                if ((cmpType == Comparison.EQUAL || cmpType == Comparison.EQUAL_NULL_SAFE) &&
+                    c.getColumn().getColumnId() == affColId && c.isEvaluatable()) {
+                    affKeyCondFound = true;
+
+                    Expression exp = c.getExpression();
+
+                    exp = exp.getNonAliasExpression();
+
+                    if (exp instanceof ExpressionColumn) {
+                        ExpressionColumn expCol = (ExpressionColumn)exp;
+
+                        // This is one of our previous joins.
+                        TableFilter prevJoin = expCol.getTableFilter();
+
+                        if (prevJoin != null) {
+                            GridH2CollocationModel cm = child(indexOf(prevJoin), true);
+
+                            if (cm != null) {
+                                Type t = cm.type(true);
+
+                                if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol, validate))
+                                    return Affinity.JOINED_WITH_COLLOCATED;
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return affKeyCondFound ? Affinity.HAS_AFFINITY_CONDITION : Affinity.NONE;
+    }
+
+    /**
+     * @param f Table filter.
+     * @return Index.
+     */
+    private int indexOf(TableFilter f) {
+        for (int i = 0; i < childFilters.length; i++) {
+            if (childFilters[i] == f)
+                return i;
+        }
+
+        throw new IllegalStateException();
+    }
+
+    /**
+     * @param f Table filter.
+     * @param expCol Expression column.
+     * @param validate Query validation flag.
+     * @return {@code true} It it is an affinity column.
+     */
+    private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol, boolean validate) {
+        Column col = expCol.getColumn();
+
+        if (col == null)
+            return false;
+
+        Table t = col.getTable();
+
+        if (t.isView()) {
+            Query qry;
+
+            if (f.getIndex() != null)
+                qry = getSubQuery(f);
+            else
+                qry = GridSqlQueryParser.VIEW_QUERY.get((TableView)t);
+
+            return isAffinityColumn(qry, expCol, validate);
+        }
+
+        if (t instanceof GridH2Table) {
+            if (validate && ((GridH2Table)t).rowDescriptor().context().customAffinityMapper())
+                throw customAffinityError(((GridH2Table)t).spaceName());
+
+            IndexColumn affCol = ((GridH2Table)t).getAffinityKeyColumn();
+
+            return affCol != null && col.getColumnId() == affCol.column.getColumnId();
+        }
+
+        return false;
+    }
+
+    /**
+     * @param qry Query.
+     * @param expCol Expression column.
+     * @param validate Query validation flag.
+     * @return {@code true} It it is an affinity column.
+     */
+    private static boolean isAffinityColumn(Query qry, ExpressionColumn expCol, boolean validate) {
+        if (qry.isUnion()) {
+            SelectUnion union = (SelectUnion)qry;
+
+            return isAffinityColumn(union.getLeft(), expCol, validate) && isAffinityColumn(union.getRight(), expCol, validate);
+        }
+
+        Expression exp = qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression();
+
+        if (exp instanceof ExpressionColumn) {
+            expCol = (ExpressionColumn)exp;
+
+            return isAffinityColumn(expCol.getTableFilter(), expCol, validate);
+        }
+
+        return false;
+    }
+
+    /**
+     * @return Multiplier.
+     */
+    public int calculateMultiplier() {
+        // We don't need multiplier for union here because it will be summarized in H2.
+        return multiplier(false);
+    }
+
+    /**
+     * @param withUnion With respect to union.
+     * @return Multiplier.
+     */
+    private int multiplier(boolean withUnion) {
+        calculate();
+
+        assert multiplier != 0;
+
+        if (withUnion && unions != null) {
+            int maxMultiplier = 0;
+
+            for (int i = 0; i < unions.size(); i++) {
+                int m = unions.get(i).multiplier(false);
+
+                if (m > maxMultiplier)
+                    maxMultiplier = m;
+            }
+
+            return maxMultiplier;
+        }
+
+        return multiplier;
+    }
+
+    /**
+     * @param withUnion With respect to union.
+     * @return Type.
+     */
+    private Type type(boolean withUnion) {
+        calculate();
+
+        assert type != null;
+
+        if (withUnion && unions != null) {
+            Type left = unions.get(0).type(false);
+
+            for (int i = 1; i < unions.size(); i++) {
+                Type right = unions.get(i).type(false);
+
+                if (!left.isCollocated() || !right.isCollocated()) {
+                    left = Type.PARTITIONED_NOT_COLLOCATED;
+
+                    break;
+                }
+                else if (!left.isPartitioned() && !right.isPartitioned())
+                    left = Type.REPLICATED;
+                else
+                    left = Type.PARTITIONED_COLLOCATED;
+            }
+
+            return left;
+        }
+
+        return type;
+    }
+
+    /**
+     * @param i Index.
+     * @param create Create child if needed.
+     * @return Child collocation.
+     */
+    private GridH2CollocationModel child(int i, boolean create) {
+        GridH2CollocationModel child = children[i];
+
+        if (child == null && create && isChildTableOrView(i, null)) {
+            TableFilter f = childFilters[i];
+
+            if (f.getTable().isView()) {
+                if (f.getIndex() == null) {
+                    // If we don't have view index yet, then we just creating empty model and it must be filled later.
+                    child = createChildModel(this, i, null, true, validate);
+                }
+                else
+                    child = buildCollocationModel(this, i, getSubQuery(f), null, validate);
+            }
+            else
+                child = createChildModel(this, i, null, false, validate);
+
+            assert child != null;
+            assert children[i] == child;
+        }
+
+        return child;
+    }
+
+    /**
+     * @param f Table filter.
+     * @return Sub-query.
+     */
+    private static Query getSubQuery(TableFilter f) {
+        return ((ViewIndex)f.getIndex()).getQuery();
+    }
+
+    /**
+     * @return Unions list.
+     */
+    private List<GridH2CollocationModel> getOrCreateUnions() {
+        if (unions == null) {
+            unions = new ArrayList<>(4);
+
+            unions.add(this);
+        }
+
+        return unions;
+    }
+
+    /**
+     * @param qctx Query context.
+     * @param info Sub-query info.
+     * @param filters Filters.
+     * @param filter Filter.
+     * @param validate Query validation flag.
+     * @return Collocation.
+     */
+    public static GridH2CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info,
+        TableFilter[] filters, int filter, boolean validate) {
+        GridH2CollocationModel cm;
+
+        if (info != null) {
+            // Go up until we reach the root query.
+            cm = buildCollocationModel(qctx, info.getUpper(), info.getFilters(), info.getFilter(), validate);
+        }
+        else {
+            // We are at the root query.
+            cm = qctx.queryCollocationModel();
+
+            if (cm == null) {
+                cm = createChildModel(null, -1, null, true, validate);
+
+                qctx.queryCollocationModel(cm);
+            }
+        }
+
+        assert cm.view;
+
+        Select select = filters[0].getSelect();
+
+        // Handle union. We have to rely on fact that select will be the same on uppermost select.
+        // For sub-queries we will drop collocation models, so that they will be recalculated anyways.
+        if (cm.select != null && cm.select != select) {
+            List<GridH2CollocationModel> unions = cm.getOrCreateUnions();
+
+            // Try to find this select in existing unions.
+            // Start with 1 because at 0 it always will be c.
+            for (int i = 1; i < unions.size(); i++) {
+                GridH2CollocationModel u = unions.get(i);
+
+                if (u.select == select) {
+                    cm = u;
+
+                    break;
+                }
+            }
+
+            // Nothing was found, need to create new child in union.
+            if (cm.select != select)
+                cm = createChildModel(cm.upper, cm.filter, unions, true, validate);
+        }
+
+        cm.childFilters(filters);
+
+        return cm.child(filter, true);
+    }
+
+    /**
+     * @param qry Query.
+     * @return {@code true} If the query is collocated.
+     */
+    public static boolean isCollocated(Query qry) {
+        GridH2CollocationModel mdl = buildCollocationModel(null, -1, qry, null, true);
+
+        Type type = mdl.type(true);
+
+        if (!type.isCollocated() && mdl.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
+            throw new CacheException("Failed to execute query: for distributed join " +
+                "all REPLICATED caches must be at the end of the joined tables list.");
+
+        return type.isCollocated();
+    }
+
+    /**
+     * @param upper Upper.
+     * @param filter Filter.
+     * @param qry Query.
+     * @param unions Unions.
+     * @param validate Query validation flag.
+     * @return Built model.
+     */
+    private static GridH2CollocationModel buildCollocationModel(GridH2CollocationModel upper,
+        int filter,
+        Query qry,
+        List<GridH2CollocationModel> unions,
+        boolean validate) {
+        if (qry.isUnion()) {
+            if (unions == null)
+                unions = new ArrayList<>();
+
+            SelectUnion union = (SelectUnion)qry;
+
+            GridH2CollocationModel left = buildCollocationModel(upper, filter, union.getLeft(), unions, validate);
+            GridH2CollocationModel right = buildCollocationModel(upper, filter, union.getRight(), unions, validate);
+
+            assert left != null;
+            assert right != null;
+
+            return upper != null ? upper : left;
+        }
+
+        Select select = (Select)qry;
+
+        List<TableFilter> list = new ArrayList<>();
+
+        for (TableFilter f = select.getTopTableFilter(); f != null; f = f.getJoin())
+            list.add(f);
+
+        TableFilter[] filters = list.toArray(new TableFilter[list.size()]);
+
+        GridH2CollocationModel cm = createChildModel(upper, filter, unions, true, validate);
+
+        cm.childFilters(filters);
+
+        for (int i = 0; i < filters.length; i++) {
+            TableFilter f = filters[i];
+
+            if (f.getTable().isView())
+                buildCollocationModel(cm, i, getSubQuery(f), null, validate);
+            else if (f.getTable() instanceof GridH2Table)
+                createChildModel(cm, i, null, false, validate);
+        }
+
+        return upper != null ? upper : cm;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Error.
+     */
+    private static CacheException customAffinityError(String cacheName) {
+        return new CacheException("Failed to prepare distributed join query: can not use distributed joins for cache " +
+            "with custom AffinityKeyMapper configured. " +
+            "Please use AffinityKeyMapped annotation instead [cache=" + cacheName + ']');
+    }
+
+    /**
+     * Collocation type.
+     */
+    private enum Type {
+        /** */
+        PARTITIONED_COLLOCATED(true, true),
+
+        /** */
+        PARTITIONED_NOT_COLLOCATED(true, false),
+
+        /** */
+        REPLICATED(false, true);
+
+        /** */
+        private final boolean partitioned;
+
+        /** */
+        private final boolean collocated;
+
+        /**
+         * @param partitioned Partitioned.
+         * @param collocated Collocated.
+         */
+        Type(boolean partitioned, boolean collocated) {
+            this.partitioned = partitioned;
+            this.collocated = collocated;
+        }
+
+        /**
+         * @return {@code true} If partitioned.
+         */
+        public boolean isPartitioned() {
+            return partitioned;
+        }
+
+        /**
+         * @return {@code true} If collocated.
+         */
+        public boolean isCollocated() {
+            return collocated;
+        }
+
+        /**
+         * @param partitioned Partitioned.
+         * @param collocated Collocated.
+         * @return Type.
+         */
+        static Type of(boolean partitioned, boolean collocated) {
+            if (collocated)
+                return partitioned ? Type.PARTITIONED_COLLOCATED : Type.REPLICATED;
+
+            assert partitioned;
+
+            return Type.PARTITIONED_NOT_COLLOCATED;
+        }
+    }
+
+    /**
+     * Affinity of a table relative to previous joined tables.
+     */
+    private enum Affinity {
+        /** */
+        NONE,
+
+        /** */
+        HAS_AFFINITY_CONDITION,
+
+        /** */
+        JOINED_WITH_COLLOCATED
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Cursor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Cursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Cursor.java
index a2f60c4..66d5736 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Cursor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Cursor.java
@@ -28,23 +28,44 @@ import org.h2.result.SearchRow;
  */
 public class GridH2Cursor implements Cursor {
     /** */
-    private Iterator<GridH2Row> iter;
+    public static final Cursor EMPTY = new Cursor() {
+        @Override public Row get() {
+            return null;
+        }
+
+        @Override public SearchRow getSearchRow() {
+            return null;
+        }
+
+        @Override public boolean next() {
+            return false;
+        }
+
+        @Override public boolean previous() {
+            return false;
+        }
+    };
 
     /** */
-    private Row row;
+    protected Iterator<? extends Row> iter;
+
+    /** */
+    protected Row cur;
 
     /**
      * Constructor.
      *
      * @param iter Rows iterator.
      */
-    public GridH2Cursor(Iterator<GridH2Row> iter) {
+    public GridH2Cursor(Iterator<? extends Row> iter) {
+        assert iter != null;
+
         this.iter = iter;
     }
 
     /** {@inheritDoc} */
     @Override public Row get() {
-        return row;
+        return cur;
     }
 
     /** {@inheritDoc} */
@@ -54,12 +75,9 @@ public class GridH2Cursor implements Cursor {
 
     /** {@inheritDoc} */
     @Override public boolean next() {
-        row = null;
-
-        if (iter.hasNext())
-            row = iter.next();
+        cur = iter.hasNext() ? iter.next() : null;
 
-        return row != null;
+        return cur != null;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2DefaultTableEngine.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2DefaultTableEngine.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2DefaultTableEngine.java
new file mode 100644
index 0000000..f53f1b3
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2DefaultTableEngine.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import org.h2.api.TableEngine;
+import org.h2.command.ddl.CreateTableData;
+import org.h2.table.RegularTable;
+import org.h2.table.Table;
+
+/**
+ * Default table engine.
+ */
+public class GridH2DefaultTableEngine implements TableEngine {
+    /** {@inheritDoc} */
+    @Override public Table createTable(CreateTableData data) {
+        assert !data.persistData && !data.persistIndexes;
+
+        if (data.isHidden && data.id == 0 && "SYS".equals(data.tableName))
+            return new GridH2MetaTable(data);
+
+        return new RegularTable(data);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index fbf7c7c..c29239f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -17,49 +17,164 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.lang.GridFilteredIterator;
+import org.apache.ignite.internal.util.typedef.CIX2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
+import org.h2.index.Cursor;
+import org.h2.index.IndexCondition;
+import org.h2.index.IndexLookupBatch;
+import org.h2.index.ViewIndex;
 import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
-import org.h2.result.SortOrder;
+import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
+import org.h2.util.DoneFuture;
 import org.h2.value.Value;
+import org.h2.value.ValueNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.emptyIterator;
+import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
+import static org.h2.result.Row.MEMORY_CALCULATE;
+
 /**
  * Index base.
  */
 public abstract class GridH2IndexBase extends BaseIndex {
     /** */
-    protected static final ThreadLocal<IndexingQueryFilter> filters = new ThreadLocal<>();
+    private static final Object EXPLICIT_NULL = new Object();
+
+    /** */
+    private static final AtomicLong idxIdGen = new AtomicLong();
+
+    /** */
+    protected final long idxId = idxIdGen.incrementAndGet();
+
+    /** */
+    private final ThreadLocal<Object> snapshot = new ThreadLocal<>();
+
+    /** */
+    private Object msgTopic;
+
+    /** */
+    private GridMessageListener msgLsnr;
 
     /** */
-    protected final int keyCol;
+    private IgniteLogger log;
 
     /** */
-    protected final int valCol;
+    private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() {
+        @Override public void applyx(ClusterNode clusterNode, Message msg) throws IgniteCheckedException {
+            onMessage0(clusterNode.id(), msg);
+        }
+    };
 
     /**
-     * @param keyCol Key column.
-     * @param valCol Value column.
+     * @param tbl Table.
      */
-    protected GridH2IndexBase(int keyCol, int valCol) {
-        this.keyCol = keyCol;
-        this.valCol = valCol;
+    protected final void initDistributedJoinMessaging(GridH2Table tbl) {
+        final GridH2RowDescriptor desc = tbl.rowDescriptor();
+
+        if (desc != null && desc.context() != null) {
+            GridKernalContext ctx = desc.context().kernalContext();
+
+            log = ctx.log(getClass());
+
+            msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifier() + '.' + getName());
+
+            msgLsnr = new GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object msg) {
+                    GridSpinBusyLock l = desc.indexing().busyLock();
+
+                    if (!l.enterBusy())
+                        return;
+
+                    try {
+                        onMessage0(nodeId, msg);
+                    }
+                    finally {
+                        l.leaveBusy();
+                    }
+                }
+            };
+
+            ctx.io().addMessageListener(msgTopic, msgLsnr);
+        }
+        else {
+            msgTopic = null;
+            msgLsnr = null;
+            log = new NullLogger();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void close(Session ses) {
+        // No-op. Actual index destruction must happen in method destroy.
     }
 
     /**
-     * Sets key filters for current thread.
-     *
-     * @param fs Filters.
+     * Attempts to destroys index and release all the resources.
+     * We use this method instead of {@link #close(Session)} because that method
+     * is used by H2 internally.
      */
-    public static void setFiltersForThread(IndexingQueryFilter fs) {
-        filters.set(fs);
+    public void destroy() {
+        if (msgLsnr != null)
+            kernalContext().io().removeMessageListener(msgTopic, msgLsnr);
     }
 
     /**
@@ -92,71 +207,121 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * Takes or sets existing snapshot to be used in current thread.
      *
      * @param s Optional existing snapshot to use.
+     * @param qctx Query context.
      * @return Snapshot.
      */
-    public Object takeSnapshot(@Nullable Object s) {
+    public final Object takeSnapshot(@Nullable Object s, GridH2QueryContext qctx) {
+        assert snapshot.get() == null;
+
+        if (s == null)
+            s = doTakeSnapshot();
+
+        if (s != null) {
+            if (s instanceof GridReservable && !((GridReservable)s).reserve())
+                return null;
+
+            snapshot.set(s);
+
+            if (qctx != null)
+                qctx.putSnapshot(idxId, s);
+        }
+
         return s;
     }
 
     /**
-     * Releases snapshot for current thread.
+     * @param ses Session.
      */
-    public void releaseSnapshot() {
-        // No-op.
+    private static void clearViewIndexCache(Session ses) {
+        Map<Object,ViewIndex> viewIdxCache = ses.getViewIndexCache(true);
+
+        if (!viewIdxCache.isEmpty())
+            viewIdxCache.clear();
     }
 
-    /** {@inheritDoc} */
-    @Override public int compareRows(SearchRow rowData, SearchRow compare) {
-        if (rowData == compare)
-            return 0;
+    /**
+     * @param ses Session.
+     * @param filters All joined table filters.
+     * @param filter Current filter.
+     * @return Multiplier.
+     */
+    public int getDistributedMultiplier(Session ses, TableFilter[] filters, int filter) {
+        GridH2QueryContext qctx = GridH2QueryContext.get();
 
-        for (int i = 0, len = indexColumns.length; i < len; i++) {
-            int index = columnIds[i];
+        // We do complex optimizations with respect to distributed joins only on prepare stage
+        // because on run stage reordering of joined tables by Optimizer is explicitly disabled
+        // and thus multiplier will be always the same, so it will not affect choice of index.
+        // Query expressions can not be distributed as well.
+        if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins() || ses.isPreparingQueryExpression())
+            return GridH2CollocationModel.MULTIPLIER_COLLOCATED;
 
-            Value v1 = rowData.getValue(index);
-            Value v2 = compare.getValue(index);
+        // We have to clear this cache because normally sub-query plan cost does not depend on anything
+        // other than index condition masks and sort order, but in our case it can depend on order
+        // of previous table filters.
+        clearViewIndexCache(ses);
 
-            if (v1 == null || v2 == null)
-                return 0;
+        assert filters != null;
 
-            int c = compareValues(v1, v2, indexColumns[i].sortType);
+        GridH2CollocationModel c = buildCollocationModel(qctx, ses.getSubQueryInfo(), filters, filter, false);
 
-            if (c != 0)
-                return c;
-        }
-        return 0;
+        return c.calculateMultiplier();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridH2Table getTable() {
+        return (GridH2Table)super.getTable();
+    }
+
+    /**
+     * Takes and returns actual snapshot or {@code null} if snapshots are not supported.
+     *
+     * @return Snapshot or {@code null}.
+     */
+    @Nullable protected abstract Object doTakeSnapshot();
+
+    /**
+     * @return Thread local snapshot.
+     */
+    @SuppressWarnings("unchecked")
+    protected <T> T threadLocalSnapshot() {
+        return (T)snapshot.get();
     }
 
     /**
-     * @param a First value.
-     * @param b Second value.
-     * @param sortType Sort type.
-     * @return Comparison result.
+     * Releases snapshot for current thread.
      */
-    private int compareValues(Value a, Value b, int sortType) {
-        if (a == b)
-            return 0;
+    public void releaseSnapshot() {
+        Object s = snapshot.get();
+
+        assert s != null;
 
-        int comp = table.compareTypeSave(a, b);
+        snapshot.remove();
 
-        if ((sortType & SortOrder.DESCENDING) != 0)
-            comp = -comp;
+        if (s instanceof GridReservable)
+            ((GridReservable)s).release();
 
-        return comp;
+        if (s instanceof AutoCloseable)
+            U.closeQuiet((AutoCloseable)s);
     }
 
     /**
      * Filters rows from expired ones and using predicate.
      *
      * @param iter Iterator over rows.
+     * @param filter Optional filter.
      * @return Filtered iterator.
      */
-    protected Iterator<GridH2Row> filter(Iterator<GridH2Row> iter) {
-        IgniteBiPredicate<Object, Object> p = null;
+    protected Iterator<GridH2Row> filter(Iterator<GridH2Row> iter, IndexingQueryFilter filter) {
+        return new FilteringIterator(iter, U.currentTimeMillis(), filter, getTable().spaceName());
+    }
 
-        IndexingQueryFilter f = filters.get();
+    /**
+     * @return Filter for currently running query or {@code null} if none.
+     */
+    protected static IndexingQueryFilter threadLocalFilter() {
+        GridH2QueryContext qctx = GridH2QueryContext.get();
 
-        return new FilteringIterator(iter, U.currentTimeMillis(), f);
+        return qctx != null ? qctx.filter() : null;
     }
 
     /** {@inheritDoc} */
@@ -194,10 +359,1125 @@ public abstract class GridH2IndexBase extends BaseIndex {
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
+        GridH2QueryContext qctx = GridH2QueryContext.get();
+
+        if (qctx == null || !qctx.distributedJoins() || !getTable().isPartitioned())
+            return null;
+
+        IndexColumn affCol = getTable().getAffinityKeyColumn();
+
+        int affColId;
+        boolean ucast;
+
+        if (affCol != null) {
+            affColId = affCol.column.getColumnId();
+            int[] masks = filter.getMasks();
+            ucast = masks != null && masks[affColId] == IndexCondition.EQUALITY;
+        }
+        else {
+            affColId = -1;
+            ucast = false;
+        }
+
+        GridCacheContext<?,?> cctx = getTable().rowDescriptor().context();
+
+        return new DistributedLookupBatch(cctx, ucast, affColId);
+    }
+
+    /**
+     * @param nodes Nodes.
+     * @param msg Message.
+     */
+    private void send(Collection<ClusterNode> nodes, Message msg) {
+        if (!getTable().rowDescriptor().indexing().send(msgTopic,
+            -1,
+            nodes,
+            msg,
+            null,
+            locNodeHnd,
+            GridIoPolicy.IDX_POOL,
+            false))
+            throw new GridH2RetryException("Failed to send message to nodes: " + nodes + ".");
+    }
+
+    /**
+     * @param nodeId Source node ID.
+     * @param msg Message.
+     */
+    private void onMessage0(UUID nodeId, Object msg) {
+        ClusterNode node = kernalContext().discovery().node(nodeId);
+
+        if (node == null)
+            return;
+
+        try {
+            if (msg instanceof GridH2IndexRangeRequest)
+                onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
+            else if (msg instanceof GridH2IndexRangeResponse)
+                onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
+        }
+        catch (Throwable th) {
+            U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
+
+            if (th instanceof Error)
+                throw th;
+        }
+    }
+
+    /**
+     * @return Kernal context.
+     */
+    private GridKernalContext kernalContext() {
+        return getTable().rowDescriptor().context().kernalContext();
+    }
+
+    /**
+     * @param node Requesting node.
+     * @param msg Request message.
+     */
+    private void onIndexRangeRequest(ClusterNode node, GridH2IndexRangeRequest msg) {
+        GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
+            msg.originNodeId(),
+            msg.queryId(),
+            MAP);
+
+        GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
+
+        res.originNodeId(msg.originNodeId());
+        res.queryId(msg.queryId());
+        res.batchLookupId(msg.batchLookupId());
+
+        if (qctx == null)
+            res.status(STATUS_NOT_FOUND);
+        else {
+            try {
+                RangeSource src;
+
+                if (msg.bounds() != null) {
+                    // This is the first request containing all the search rows.
+                    ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> snapshot0 = qctx.getSnapshot(idxId);
+
+                    assert !msg.bounds().isEmpty() : "empty bounds";
+
+                    src = new RangeSource(msg.bounds(), snapshot0, qctx.filter());
+                }
+                else {
+                    // This is request to fetch next portion of data.
+                    src = qctx.getSource(node.id(), msg.batchLookupId());
+
+                    assert src != null;
+                }
+
+                List<GridH2RowRange> ranges = new ArrayList<>();
+
+                int maxRows = qctx.pageSize();
+
+                assert maxRows > 0 : maxRows;
+
+                while (maxRows > 0) {
+                    GridH2RowRange range = src.next(maxRows);
+
+                    if (range == null)
+                        break;
+
+                    ranges.add(range);
+
+                    if (range.rows() != null)
+                        maxRows -= range.rows().size();
+                }
+
+                if (src.hasMoreRows()) {
+                    // Save source for future fetches.
+                    if (msg.bounds() != null)
+                        qctx.putSource(node.id(), msg.batchLookupId(), src);
+                }
+                else if (msg.bounds() == null) {
+                    // Drop saved source.
+                    qctx.putSource(node.id(), msg.batchLookupId(), null);
+                }
+
+                assert !ranges.isEmpty();
+
+                res.ranges(ranges);
+                res.status(STATUS_OK);
+            }
+            catch (Throwable th) {
+                U.error(log, "Failed to process request: " + msg, th);
+
+                res.error(th.getClass() + ": " + th.getMessage());
+                res.status(STATUS_ERROR);
+            }
+        }
+
+        send(singletonList(node), res);
+    }
+
+    /**
+     * @param node Responded node.
+     * @param msg Response message.
+     */
+    private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
+        GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
+            msg.originNodeId(), msg.queryId(), MAP);
+
+        if (qctx == null)
+            return;
+
+        Map<ClusterNode, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
+
+        if (streams == null)
+            return;
+
+        RangeStream stream = streams.get(node);
+
+        assert stream != null;
+
+        stream.onResponse(msg);
+    }
+
+    /**
+     * @param v1 First value.
+     * @param v2 Second value.
+     * @return {@code true} If they equal.
+     */
+    private boolean equal(Value v1, Value v2) {
+        return v1 == v2 || (v1 != null && v2 != null && v1.compareTypeSafe(v2, getDatabase().getCompareMode()) == 0);
+    }
+
+    /**
+     * @param qctx Query context.
+     * @param batchLookupId Batch lookup ID.
+     * @return Index range request.
+     */
+    private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId) {
+        GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
+
+        req.originNodeId(qctx.originNodeId());
+        req.queryId(qctx.queryId());
+        req.batchLookupId(batchLookupId);
+
+        return req;
+    }
+
+    /**
+     * @param qctx Query context.
+     * @param cctx Cache context.
+     * @return Collection of nodes for broadcasting.
+     */
+    private List<ClusterNode> broadcastNodes(GridH2QueryContext qctx, GridCacheContext<?,?> cctx) {
+        Map<UUID, int[]> partMap = qctx.partitionsMap();
+
+        List<ClusterNode> res;
+
+        if (partMap == null)
+            res = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+        else {
+            res = new ArrayList<>(partMap.size());
+
+            GridKernalContext ctx = kernalContext();
+
+            for (UUID nodeId : partMap.keySet()) {
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (node == null)
+                    throw new GridH2RetryException("Failed to find node.");
+
+                res.add(node);
+            }
+        }
+
+        if (F.isEmpty(res))
+            throw new GridH2RetryException("Failed to collect affinity nodes.");
+
+        return res;
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param qctx Query context.
+     * @param affKeyObj Affinity key.
+     * @return Cluster nodes or {@code null} if affinity key is a null value.
+     */
+    private ClusterNode rangeNode(GridCacheContext<?,?> cctx, GridH2QueryContext qctx, Object affKeyObj) {
+        assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
+
+        ClusterNode node;
+
+        if (qctx.partitionsMap() != null) {
+            // If we have explicit partitions map, we have to use it to calculate affinity node.
+            UUID nodeId = qctx.nodeForPartition(cctx.affinity().partition(affKeyObj), cctx);
+
+            node = cctx.discovery().node(nodeId);
+        }
+        else // Get primary node for current topology version.
+            node = cctx.affinity().primary(affKeyObj, qctx.topologyVersion());
+
+        if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
+            throw new GridH2RetryException("Failed to find node.");
+
+        return node;
+    }
+
+    /**
+     * @param row Row.
+     * @return Row message.
+     */
+    private GridH2RowMessage toRowMessage(Row row) {
+        if (row == null)
+            return null;
+
+        int cols = row.getColumnCount();
+
+        assert cols > 0 : cols;
+
+        List<GridH2ValueMessage> vals = new ArrayList<>(cols);
+
+        for (int i = 0; i < cols; i++) {
+            try {
+                vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i)));
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        GridH2RowMessage res = new GridH2RowMessage();
+
+        res.values(vals);
+
+        return res;
+    }
+
+    /**
+     * @param msg Row message.
+     * @return Search row.
+     */
+    private SearchRow toSearchRow(GridH2RowMessage msg) {
+        if (msg == null)
+            return null;
+
+        GridKernalContext ctx = kernalContext();
+
+        Value[] vals = new Value[getTable().getColumns().length];
+
+        assert vals.length > 0;
+
+        List<GridH2ValueMessage> msgVals = msg.values();
+
+        for (int i = 0; i < indexColumns.length; i++) {
+            if (i >= msgVals.size())
+                continue;
+
+            try {
+                vals[indexColumns[i].column.getColumnId()] = msgVals.get(i).value(ctx);
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        return database.createRow(vals, MEMORY_CALCULATE);
+    }
+
+    /**
+     * @param row Search row.
+     * @return Row message.
+     */
+    private GridH2RowMessage toSearchRowMessage(SearchRow row) {
+        if (row == null)
+            return null;
+
+        List<GridH2ValueMessage> vals = new ArrayList<>(indexColumns.length);
+
+        for (IndexColumn idxCol : indexColumns) {
+            Value val = row.getValue(idxCol.column.getColumnId());
+
+            if (val == null)
+                break;
+
+            try {
+                vals.add(GridH2ValueMessageFactory.toMessage(val));
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        GridH2RowMessage res = new GridH2RowMessage();
+
+        res.values(vals);
+
+        return res;
+    }
+
+    /**
+     * @param msg Message.
+     * @return Row.
+     */
+    private Row toRow(GridH2RowMessage msg) {
+        if (msg == null)
+            return null;
+
+        GridKernalContext ctx = kernalContext();
+
+        List<GridH2ValueMessage> vals = msg.values();
+
+        assert !F.isEmpty(vals) : vals;
+
+        Value[] vals0 = new Value[vals.size()];
+
+        for (int i = 0; i < vals0.length; i++) {
+            try {
+                vals0[i] = vals.get(i).value(ctx);
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        return database.createRow(vals0, MEMORY_CALCULATE);
+    }
+
+    /**
+     * Simple cursor from a single node.
+     */
+    private static class UnicastCursor implements Cursor {
+        /** */
+        final int rangeId;
+
+        /** */
+        RangeStream stream;
+
+        /**
+         * @param rangeId Range ID.
+         * @param nodes Remote nodes.
+         * @param rangeStreams Range streams.
+         */
+        private UnicastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
+            assert nodes.size() == 1;
+
+            this.rangeId = rangeId;
+            this.stream = rangeStreams.get(F.first(nodes));
+
+            assert stream != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            return stream.next(rangeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Row get() {
+            return stream.get(rangeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public SearchRow getSearchRow() {
+            return get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean previous() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * Merge cursor from multiple nodes.
+     */
+    private class BroadcastCursor implements Cursor, Comparator<RangeStream> {
+        /** */
+        final int rangeId;
+
+        /** */
+        final RangeStream[] streams;
+
+        /** */
+        boolean first = true;
+
+        /** */
+        int off;
+
+        /**
+         * @param rangeId Range ID.
+         * @param nodes Remote nodes.
+         * @param rangeStreams Range streams.
+         */
+        private BroadcastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
+            assert nodes.size() > 1;
+
+            this.rangeId = rangeId;
+
+            streams = new RangeStream[nodes.size()];
+
+            int i = 0;
+
+            for (ClusterNode node : nodes) {
+                RangeStream stream = rangeStreams.get(node);
+
+                assert stream != null;
+
+                streams[i++] = stream;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compare(RangeStream o1, RangeStream o2) {
+            if (o1 == o2)
+                return 0;
+
+            // Nulls are at the beginning of array.
+            if (o1 == null)
+                return -1;
+
+            if (o2 == null)
+                return 1;
+
+            return compareRows(o1.get(rangeId), o2.get(rangeId));
+        }
+
+        /**
+         * Try to fetch the first row.
+         *
+         * @return {@code true} If we were able to find at least one row.
+         */
+        private boolean goFirst() {
+            // Fetch first row from all the streams and sort them.
+            for (int i = 0; i < streams.length; i++) {
+                if (!streams[i].next(rangeId)) {
+                    streams[i] = null;
+                    off++; // After sorting this offset will cut off all null elements at the beginning of array.
+                }
+            }
+
+            if (off == streams.length)
+                return false;
+
+            Arrays.sort(streams, this);
+
+            return true;
+        }
+
+        /**
+         * Fetch next row.
+         *
+         * @return {@code true} If we were able to find at least one row.
+         */
+        private boolean goNext() {
+            assert off != streams.length;
+
+            if (!streams[off].next(rangeId)) {
+                // Next row from current min stream was not found -> nullify that stream and bump offset forward.
+                streams[off] = null;
+
+                return ++off != streams.length;
+            }
+
+            // Bubble up current min stream with respect to fetched row to achieve correct sort order of streams.
+            for (int i = off, last = streams.length - 1; i < last; i++) {
+                if (compareRows(streams[i].get(rangeId), streams[i + 1].get(rangeId)) <= 0)
+                    break;
+
+                U.swap(streams, i, i + 1);
+            }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (first) {
+                first = false;
+
+                return goFirst();
+            }
+
+            return goNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Row get() {
+            return streams[off].get(rangeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public SearchRow getSearchRow() {
+            return get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean previous() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * Index lookup batch.
+     */
+    private class DistributedLookupBatch implements IndexLookupBatch {
+        /** */
+        final GridCacheContext<?,?> cctx;
+
+        /** */
+        final boolean ucast;
+
+        /** */
+        final int affColId;
+
+        /** */
+        GridH2QueryContext qctx;
+
+        /** */
+        int batchLookupId;
+
+        /** */
+        Map<ClusterNode, RangeStream> rangeStreams = Collections.emptyMap();
+
+        /** */
+        List<ClusterNode> broadcastNodes;
+
+        /** */
+        List<Future<Cursor>> res = Collections.emptyList();
+
+        /** */
+        boolean batchFull;
+
+        /** */
+        boolean findCalled;
+
+        /**
+         * @param cctx Cache Cache context.
+         * @param ucast Unicast or broadcast query.
+         * @param affColId Affinity column ID.
+         */
+        private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean ucast, int affColId) {
+            this.cctx = cctx;
+            this.ucast = ucast;
+            this.affColId = affColId;
+        }
+
+        /**
+         * @param firstRow First row.
+         * @param lastRow Last row.
+         * @return Affinity key or {@code null}.
+         */
+        private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) {
+            if (firstRow == null || lastRow == null)
+                return null;
+
+            Value affKeyFirst = firstRow.getValue(affColId);
+            Value affKeyLast = lastRow.getValue(affColId);
+
+            if (affKeyFirst != null && equal(affKeyFirst, affKeyLast))
+                return affKeyFirst == ValueNull.INSTANCE ? EXPLICIT_NULL : affKeyFirst.getObject();
+
+            if (affColId == KEY_COL)
+                return null;
+
+            // Try to extract affinity key from primary key.
+            Value pkFirst = firstRow.getValue(KEY_COL);
+            Value pkLast = lastRow.getValue(KEY_COL);
+
+            if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE)
+                return EXPLICIT_NULL;
+
+            if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
+                return null;
+
+            Object pkAffKeyFirst = cctx.affinity().affinityKey(pkFirst.getObject());
+            Object pkAffKeyLast = cctx.affinity().affinityKey(pkLast.getObject());
+
+            if (pkAffKeyFirst == null || pkAffKeyLast == null)
+                throw new CacheException("Cache key without affinity key.");
+
+            if (pkAffKeyFirst.equals(pkAffKeyLast))
+                return pkAffKeyFirst;
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("ForLoopReplaceableByForEach")
+        @Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) {
+            if (qctx == null || findCalled) {
+                if (qctx == null) {
+                    // It is the first call after query begin (may be after reuse),
+                    // reinitialize query context and result.
+                    qctx = GridH2QueryContext.get();
+                    res = new ArrayList<>();
+
+                    assert qctx != null;
+                    assert !findCalled;
+                }
+                else {
+                    // Cleanup after the previous lookup phase.
+                    assert batchLookupId != 0;
+
+                    findCalled = false;
+                    qctx.putStreams(batchLookupId, null);
+                    res.clear();
+                }
+
+                // Reinitialize for the next lookup phase.
+                batchLookupId = qctx.nextBatchLookupId();
+                rangeStreams = new HashMap<>();
+            }
+
+            Object affKey = affColId == -1 ? null : getAffinityKey(firstRow, lastRow);
+
+            List<ClusterNode> nodes;
+            Future<Cursor> fut;
+
+            if (affKey != null) {
+                // Affinity key is provided.
+                if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
+                    return false;
+
+                nodes = F.asList(rangeNode(cctx, qctx, affKey));
+            }
+            else {
+                // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast.
+                if (broadcastNodes == null)
+                    broadcastNodes = broadcastNodes(qctx, cctx);
+
+                nodes = broadcastNodes;
+            }
+
+            assert !F.isEmpty(nodes) : nodes;
+
+            final int rangeId = res.size();
+
+            // Create messages.
+            GridH2RowMessage first = toSearchRowMessage(firstRow);
+            GridH2RowMessage last = toSearchRowMessage(lastRow);
+
+            // Range containing upper and lower bounds.
+            GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last);
+
+            // Add range to every message of every participating node.
+            for (int i = 0; i < nodes.size(); i++) {
+                ClusterNode node = nodes.get(i);
+                assert node != null;
+
+                RangeStream stream = rangeStreams.get(node);
+
+                List<GridH2RowRangeBounds> bounds;
+
+                if (stream == null) {
+                    stream = new RangeStream(qctx, node);
+
+                    stream.req = createRequest(qctx, batchLookupId);
+                    stream.req.bounds(bounds = new ArrayList<>());
+
+                    rangeStreams.put(node, stream);
+                }
+                else
+                    bounds = stream.req.bounds();
+
+                bounds.add(rangeBounds);
+
+                // If at least one node will have a full batch then we are ok.
+                if (bounds.size() >= qctx.pageSize())
+                    batchFull = true;
+            }
+
+            fut = new DoneFuture<>(nodes.size() == 1 ?
+                new UnicastCursor(rangeId, nodes, rangeStreams) :
+                new BroadcastCursor(rangeId, nodes, rangeStreams));
+
+            res.add(fut);
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isBatchFull() {
+            return batchFull;
+        }
+
+        /**
+         *
+         */
+        private void startStreams() {
+            if (rangeStreams.isEmpty()) {
+                assert res.isEmpty();
+
+                return;
+            }
+
+            qctx.putStreams(batchLookupId, rangeStreams);
+
+            // Start streaming.
+            for (RangeStream stream : rangeStreams.values())
+                stream.start();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<Future<Cursor>> find() {
+            batchFull = false;
+            findCalled = true;
+
+            startStreams();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset(boolean beforeQry) {
+            if (beforeQry || qctx == null) // Query context can be null if addSearchRows was never called.
+                return;
+
+            assert batchLookupId != 0;
+
+            // Do cleanup after the query run.
+            qctx.putStreams(batchLookupId, null);
+            qctx = null; // The same query can be reused multiple times for different query contexts.
+            batchLookupId = 0;
+
+            rangeStreams = Collections.emptyMap();
+            broadcastNodes = null;
+            batchFull = false;
+            findCalled = false;
+            res = Collections.emptyList();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String getPlanSQL() {
+            return ucast ? "unicast" : "broadcast";
+        }
+    }
+
+    /**
+     * Per node range stream.
+     */
+    private class RangeStream {
+        /** */
+        final GridH2QueryContext qctx;
+
+        /** */
+        final ClusterNode node;
+
+        /** */
+        GridH2IndexRangeRequest req;
+
+        /** */
+        int remainingRanges;
+
+        /** */
+        final BlockingQueue<GridH2IndexRangeResponse> respQueue = new LinkedBlockingQueue<>();
+
+        /** */
+        Iterator<GridH2RowRange> ranges = emptyIterator();
+
+        /** */
+        Cursor cursor = GridH2Cursor.EMPTY;
+
+        /** */
+        int cursorRangeId = -1;
+
+        /**
+         * @param qctx Query context.
+         * @param node Node.
+         */
+        RangeStream(GridH2QueryContext qctx, ClusterNode node) {
+            this.node = node;
+            this.qctx = qctx;
+        }
+
+        /**
+         * Start streaming.
+         */
+        private void start() {
+            remainingRanges = req.bounds().size();
+
+            assert remainingRanges > 0;
+
+            if (log.isDebugEnabled())
+                log.debug("Starting stream: [node=" + node + ", req=" + req + "]");
+
+            send(singletonList(node), req);
+        }
+
+        /**
+         * @param msg Response.
+         */
+        public void onResponse(GridH2IndexRangeResponse msg) {
+            respQueue.add(msg);
+        }
+
+        /**
+         * @return Response.
+         */
+        private GridH2IndexRangeResponse awaitForResponse() {
+            assert remainingRanges > 0;
+
+            final long start = U.currentTimeMillis();
+
+            for (int attempt = 0;; attempt++) {
+                if (qctx.isCleared())
+                    throw new GridH2RetryException("Query is cancelled.");
+
+                if (kernalContext().isStopping())
+                    throw new GridH2RetryException("Stopping node.");
+
+                GridH2IndexRangeResponse res;
+
+                try {
+                    res = respQueue.poll(500, TimeUnit.MILLISECONDS);
+                }
+                catch (InterruptedException e) {
+                    throw new GridH2RetryException("Interrupted.");
+                }
+
+                if (res != null) {
+                    switch (res.status()) {
+                        case STATUS_OK:
+                            List<GridH2RowRange> ranges0 = res.ranges();
+
+                            remainingRanges -= ranges0.size();
+
+                            if (ranges0.get(ranges0.size() - 1).isPartial())
+                                remainingRanges++;
+
+                            if (remainingRanges > 0) {
+                                if (req.bounds() != null)
+                                    req = createRequest(qctx, req.batchLookupId());
+
+                                // Prefetch next page.
+                                send(singletonList(node), req);
+                            }
+                            else
+                                req = null;
+
+                            return res;
+
+                        case STATUS_NOT_FOUND:
+                            if (req == null || req.bounds() == null) // We have already received the first response.
+                                throw new GridH2RetryException("Failure on remote node.");
+
+                            if (U.currentTimeMillis() - start > 30_000)
+                                throw new GridH2RetryException("Timeout.");
+
+                            try {
+                                U.sleep(20 * attempt);
+                            }
+                            catch (IgniteInterruptedCheckedException e) {
+                                throw new IgniteInterruptedException(e.getMessage());
+                            }
+
+                            // Retry to send the request once more after some time.
+                            send(singletonList(node), req);
+
+                            break;
+
+                        case STATUS_ERROR:
+                            throw new CacheException(res.error());
+
+                        default:
+                            throw new IllegalStateException();
+                    }
+                }
+
+                if (!kernalContext().discovery().alive(node))
+                    throw new GridH2RetryException("Node left: " + node);
+            }
+        }
+
+        /**
+         * @param rangeId Requested range ID.
+         * @return {@code true} If next row for the requested range was found.
+         */
+        private boolean next(final int rangeId) {
+            for (;;) {
+                if (rangeId == cursorRangeId) {
+                    if (cursor.next())
+                        return true;
+                }
+                else if (rangeId < cursorRangeId)
+                    return false;
+
+                cursor = GridH2Cursor.EMPTY;
+
+                while (!ranges.hasNext()) {
+                    if (remainingRanges == 0) {
+                        ranges = emptyIterator();
+
+                        return false;
+                    }
+
+                    ranges = awaitForResponse().ranges().iterator();
+                }
+
+                GridH2RowRange range = ranges.next();
+
+                cursorRangeId = range.rangeId();
+
+                if (!F.isEmpty(range.rows())) {
+                    final Iterator<GridH2RowMessage> it = range.rows().iterator();
+
+                    if (it.hasNext()) {
+                        cursor = new GridH2Cursor(new Iterator<Row>() {
+                            @Override public boolean hasNext() {
+                                return it.hasNext();
+                            }
+
+                            @Override public Row next() {
+                                // Lazily convert messages into real rows.
+                                return toRow(it.next());
+                            }
+
+                            @Override public void remove() {
+                                throw new UnsupportedOperationException();
+                            }
+                        });
+                    }
+                }
+            }
+        }
+
+        /**
+         * @param rangeId Requested range ID.
+         * @return Current row.
+         */
+        private Row get(int rangeId) {
+            assert rangeId == cursorRangeId;
+
+            return cursor.get();
+        }
+    }
+
+    /**
+     * Bounds iterator.
+     */
+    private class RangeSource {
+        /** */
+        Iterator<GridH2RowRangeBounds> boundsIter;
+
+        /** */
+        int curRangeId = -1;
+
+        /** */
+        Iterator<GridH2Row> curRange = emptyIterator();
+
+        /** */
+        final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
+
+        /** */
+        final IndexingQueryFilter filter;
+
+        /**
+         * @param bounds Bounds.
+         * @param tree Snapshot.
+         * @param filter Filter.
+         */
+        RangeSource(
+            Iterable<GridH2RowRangeBounds> bounds,
+            ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree,
+            IndexingQueryFilter filter
+        ) {
+            this.filter = filter;
+            this.tree = tree;
+            boundsIter = bounds.iterator();
+        }
+
+        /**
+         * @return {@code true} If there are more rows in this source.
+         */
+        public boolean hasMoreRows() {
+            return boundsIter.hasNext() || curRange.hasNext();
+        }
+
+        /**
+         * @param maxRows Max allowed rows.
+         * @return Range.
+         */
+        public GridH2RowRange next(int maxRows) {
+            assert maxRows > 0 : maxRows;
+
+            for (;;) {
+                if (curRange.hasNext()) {
+                    // Here we are getting last rows from previously partially fetched range.
+                    List<GridH2RowMessage> rows = new ArrayList<>();
+
+                    GridH2RowRange nextRange = new GridH2RowRange();
+
+                    nextRange.rangeId(curRangeId);
+                    nextRange.rows(rows);
+
+                    do {
+                        rows.add(toRowMessage(curRange.next()));
+                    }
+                    while (rows.size() < maxRows && curRange.hasNext());
+
+                    if (curRange.hasNext())
+                        nextRange.setPartial();
+                    else
+                        curRange = emptyIterator();
+
+                    return nextRange;
+                }
+
+                curRange = emptyIterator();
+
+                if (!boundsIter.hasNext()) {
+                    boundsIter = emptyIterator();
+
+                    return null;
+                }
+
+                GridH2RowRangeBounds bounds = boundsIter.next();
+
+                curRangeId = bounds.rangeId();
+
+                SearchRow first = toSearchRow(bounds.first());
+                SearchRow last = toSearchRow(bounds.last());
+
+                ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> t = tree != null ? tree : treeForRead();
+
+                curRange = doFind0(t, first, true, last, filter);
+
+                if (!curRange.hasNext()) {
+                    // We have to return empty range here.
+                    GridH2RowRange emptyRange = new GridH2RowRange();
+
+                    emptyRange.rangeId(curRangeId);
+
+                    return emptyRange;
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Snapshot for current thread if there is one.
+     */
+    protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @param t Tree.
+     * @param first Lower bound.
+     * @param includeFirst Whether lower bound should be inclusive.
+     * @param last Upper bound always inclusive.
+     * @param filter Filter.
+     * @return Iterator over rows in given range.
+     */
+    protected Iterator<GridH2Row> doFind0(ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t,
+        @Nullable SearchRow first,
+        boolean includeFirst,
+        @Nullable SearchRow last,
+        IndexingQueryFilter filter) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Iterator which filters by expiration time and predicate.
      */
-    protected class FilteringIterator extends GridFilteredIterator<GridH2Row> {
+    protected static class FilteringIterator extends GridFilteredIterator<GridH2Row> {
         /** */
         private final IgniteBiPredicate<Object, Object> fltr;
 
@@ -210,15 +1490,19 @@ public abstract class GridH2IndexBase extends BaseIndex {
         /**
          * @param iter Iterator.
          * @param time Time for expired rows filtering.
+         * @param qryFilter Filter.
+         * @param spaceName Space name.
          */
-        protected FilteringIterator(Iterator<GridH2Row> iter, long time,
-            IndexingQueryFilter qryFilter) {
+        protected FilteringIterator(Iterator<GridH2Row> iter,
+            long time,
+            IndexingQueryFilter qryFilter,
+            String spaceName) {
             super(iter);
 
             this.time = time;
 
             if (qryFilter != null) {
-                this.fltr = qryFilter.forSpace(((GridH2Table)getTable()).spaceName());
+                this.fltr = qryFilter.forSpace(spaceName);
 
                 this.isValRequired = qryFilter.isValueRequired();
             } else {
@@ -242,8 +1526,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
             if (fltr == null)
                 return true;
 
-            Object key = row.getValue(keyCol).getObject();
-            Object val = isValRequired ? row.getValue(valCol).getObject() : null;
+            Object key = row.getValue(KEY_COL).getObject();
+            Object val = isValRequired ? row.getValue(VAL_COL).getObject() : null;
 
             assert key != null;
             assert !isValRequired || val != null;


Mime
View raw message