ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: ignite-1232
Date Fri, 15 Jul 2016 12:52:47 GMT
ignite-1232


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e48c1d49
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e48c1d49
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e48c1d49

Branch: refs/heads/ignite-1232
Commit: e48c1d4907b287e8ffbd6c616afd2c525fd142c5
Parents: 5c2add4
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jul 15 09:31:58 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jul 15 15:52:41 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |   12 +
 .../processors/query/GridQueryProcessor.java    |   10 +-
 .../query/h2/opt/GridH2SpatialIndex.java        |    2 +
 .../query/h2/opt/GridH2CollocationModel.java    |   50 +-
 .../query/h2/opt/GridH2IndexBase.java           | 1234 ++++++++++++++++-
 .../processors/query/h2/opt/GridH2Table.java    |   34 +-
 .../query/h2/opt/GridH2TreeIndex.java           | 1240 +-----------------
 ...acheDistributedJoinCustomAffinityMapper.java |  115 +-
 ...ributedJoinPartitionedAndReplicatedTest.java |  129 +-
 .../query/IgniteSqlSplitterSelfTest.java        |  297 +++--
 .../IgniteCacheQuerySelfTestSuite.java          |    2 +
 11 files changed, 1666 insertions(+), 1459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e48c1d49/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index bd38355..fec43d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -245,6 +245,9 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** */
     private boolean deferredDel;
 
+    /** */
+    private boolean customAffMapper;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -365,6 +368,13 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @return {@code True} if custom {@link AffinityKeyMapper} is configured for cache.
+     */
+    public boolean customAffinityMapper() {
+        return customAffMapper;
+    }
+
+    /**
      * @param dynamicDeploymentId Dynamic deployment ID.
      */
     void dynamicDeploymentId(IgniteUuid dynamicDeploymentId) {
@@ -1139,6 +1149,8 @@ public class GridCacheContext<K, V> implements Externalizable {
      */
     public void cacheObjectContext(CacheObjectContext cacheObjCtx) {
         this.cacheObjCtx = cacheObjCtx;
+
+        customAffMapper = cacheCfg.getAffinityMapper().getClass() != cacheObjCtx.defaultAffMapper().getClass();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e48c1d49/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 3d8120d..9b62a45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -279,11 +279,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         if (valCls != null)
                             altTypeId = new TypeId(ccfg.getName(), valCls);
 
-                        // Need to setup affinity key for distributed joins.
-                        String affField = ctx.cacheObjects().affinityField(qryEntity.getKeyType());
+                        if (!cctx.customAffinityMapper()) {
+                            // Need to setup affinity key for distributed joins.
+                            String affField = ctx.cacheObjects().affinityField(qryEntity.getKeyType());
 
-                        if (affField != null)
-                            desc.affinityKey(affField);
+                            if (affField != null)
+                                desc.affinityKey(affField);
+                        }
                     }
                     else {
                         processClassMeta(qryEntity, desc, coCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e48c1d49/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index a64c75b..3062d13 100644
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -226,6 +226,8 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
         finally {
             l.unlock();
         }
+
+        super.destroy();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e48c1d49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
index aec5920..2f35792 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
@@ -49,7 +49,7 @@ public final class GridH2CollocationModel {
     private static final int MULTIPLIER_BROADCAST = 200;
 
     /** */
-    private static final int MULTIPLIER_REPLICATED_NOT_LAST = 1000;
+    private static final int MULTIPLIER_REPLICATED_NOT_LAST = 10_000;
 
     /** */
     private final GridH2CollocationModel upper;
@@ -85,6 +85,7 @@ public final class GridH2CollocationModel {
      * @param upper Upper.
      * @param filter Filter.
      * @param view This model will be a subquery (or top level query) and must contain child filters.
+     * @param validate Query validation flag.
      */
     private GridH2CollocationModel(GridH2CollocationModel upper, int filter, boolean view, boolean validate) {
         this.upper = upper;
@@ -98,6 +99,7 @@ public final class GridH2CollocationModel {
      * @param filter Filter.
      * @param unions Unions.
      * @param view This model will be a subquery (or top level query) and must contain child filters.
+     * @param validate Query validation flag.
      * @return Created child collocation model.
      */
     private static GridH2CollocationModel createChildModel(GridH2CollocationModel upper,
@@ -320,7 +322,14 @@ public final class GridH2CollocationModel {
     private Affinity joinedWithCollocated(int f) {
         TableFilter tf = childFilters[f];
 
-        IndexColumn affCol = ((GridH2Table)tf.getTable()).getAffinityKeyColumn();
+        GridH2Table tbl = (GridH2Table)tf.getTable();
+
+        if (validate) {
+            if (tbl.rowDescriptor().context().customAffinityMapper())
+                throw customAffinityError(tbl.spaceName());
+        }
+
+        IndexColumn affCol = tbl.getAffinityKeyColumn();
 
         boolean affKeyCondFound = false;
 
@@ -352,7 +361,7 @@ public final class GridH2CollocationModel {
                             if (cm != null) {
                                 Type t = cm.type(true);
 
-                                if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol))
+                                if (t.isPartitioned() && t.isCollocated() && isAffinityColumn(prevJoin, expCol, validate))
                                     return Affinity.JOINED_WITH_COLLOCATED;
                             }
                         }
@@ -380,9 +389,10 @@ public final class GridH2CollocationModel {
     /**
      * @param f Table filter.
      * @param expCol Expression column.
+     * @param validate Query validation flag.
      * @return {@code true} It it is an affinity column.
      */
-    private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol) {
+    private static boolean isAffinityColumn(TableFilter f, ExpressionColumn expCol, boolean validate) {
         Column col = expCol.getColumn();
 
         if (col == null)
@@ -393,12 +403,15 @@ public final class GridH2CollocationModel {
         if (t.isView()) {
             Query qry = getSubQuery(f);
 
-            return isAffinityColumn(qry, expCol);
+            return isAffinityColumn(qry, expCol, validate);
         }
 
         if (t instanceof GridH2Table) {
             IndexColumn affCol = ((GridH2Table)t).getAffinityKeyColumn();
 
+            if (validate && ((GridH2Table)t).rowDescriptor().context().customAffinityMapper())
+                throw customAffinityError(((GridH2Table)t).spaceName());
+
             return affCol != null && col.getColumnId() == affCol.column.getColumnId();
         }
 
@@ -408,13 +421,14 @@ public final class GridH2CollocationModel {
     /**
      * @param qry Query.
      * @param expCol Expression column.
+     * @param validate Query validation flag.
      * @return {@code true} It it is an affinity column.
      */
-    private static boolean isAffinityColumn(Query qry, ExpressionColumn expCol) {
+    private static boolean isAffinityColumn(Query qry, ExpressionColumn expCol, boolean validate) {
         if (qry.isUnion()) {
             SelectUnion union = (SelectUnion)qry;
 
-            return isAffinityColumn(union.getLeft(), expCol) && isAffinityColumn(union.getRight(), expCol);
+            return isAffinityColumn(union.getLeft(), expCol, validate) && isAffinityColumn(union.getRight(), expCol, validate);
         }
 
         Expression exp = qry.getExpressions().get(expCol.getColumn().getColumnId()).getNonAliasExpression();
@@ -422,7 +436,7 @@ public final class GridH2CollocationModel {
         if (exp instanceof ExpressionColumn) {
             expCol = (ExpressionColumn)exp;
 
-            return isAffinityColumn(expCol.getTableFilter(), expCol);
+            return isAffinityColumn(expCol.getTableFilter(), expCol, validate);
         }
 
         return false;
@@ -548,6 +562,7 @@ public final class GridH2CollocationModel {
      * @param info Sub-query info.
      * @param filters Filters.
      * @param filter Filter.
+     * @param validate Query validation flag.
      * @return Collocation.
      */
     public static GridH2CollocationModel buildCollocationModel(GridH2QueryContext qctx, SubQueryInfo info,
@@ -610,7 +625,7 @@ public final class GridH2CollocationModel {
         Type type = mdl.type(true);
 
         if (!type.isCollocated() && mdl.multiplier == MULTIPLIER_REPLICATED_NOT_LAST)
-            throw new CacheException("Failed to execute query: for distributed join, " +
+            throw new CacheException("Failed to execute query: for distributed join " +
                 "all REPLICATED caches must be at the end of the joined tables list.");
 
         return type.isCollocated();
@@ -621,10 +636,14 @@ public final class GridH2CollocationModel {
      * @param filter Filter.
      * @param qry Query.
      * @param unions Unions.
+     * @param validate Query validation flag.
      * @return Built model.
      */
-    private static GridH2CollocationModel buildCollocationModel(GridH2CollocationModel upper, int filter, Query qry,
-        List<GridH2CollocationModel> unions, boolean validate) {
+    private static GridH2CollocationModel buildCollocationModel(GridH2CollocationModel upper,
+        int filter,
+        Query qry,
+        List<GridH2CollocationModel> unions,
+        boolean validate) {
         if (qry.isUnion()) {
             if (unions == null)
                 unions = new ArrayList<>();
@@ -666,6 +685,15 @@ public final class GridH2CollocationModel {
     }
 
     /**
+     * @param cacheName Cache name.
+     * @return Error.
+     */
+    private static CacheException customAffinityError(String cacheName) {
+        return new CacheException("Can not use distributed joins for cache with custom AffinityKeyMapper configured. " +
+            "Please use AffinityKeyMapped annotation instead [cache=" + cacheName + ']');
+    }
+
+    /**
      * Collocation type.
      */
     private enum Type {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e48c1d49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index ff3e463..42a51a0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -17,35 +17,88 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.lang.GridFilteredIterator;
+import org.apache.ignite.internal.util.typedef.CIX2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
+import org.h2.index.Cursor;
+import org.h2.index.IndexCondition;
+import org.h2.index.IndexLookupBatch;
 import org.h2.index.ViewIndex;
 import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
+import org.h2.table.IndexColumn;
 import org.h2.table.TableFilter;
+import org.h2.util.DoneFuture;
+import org.h2.value.Value;
+import org.h2.value.ValueNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.emptyIterator;
+import static java.util.Collections.singletonList;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
+import static org.h2.result.Row.MEMORY_CALCULATE;
 
 /**
  * Index base.
  */
 public abstract class GridH2IndexBase extends BaseIndex {
     /** */
+    private static final Object EXPLICIT_NULL = new Object();
+
+    /** */
     private static final AtomicLong idxIdGen = new AtomicLong();
 
     /** */
@@ -54,8 +107,62 @@ public abstract class GridH2IndexBase extends BaseIndex {
     /** */
     private final ThreadLocal<Object> snapshot = new ThreadLocal<>();
 
+    /** */
+    private Object msgTopic;
+
+    /** */
+    private GridMessageListener msgLsnr;
+
+    /** */
+    private IgniteLogger log;
+
+    /** */
+    private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() {
+        @Override public void applyx(ClusterNode clusterNode, Message msg) throws IgniteCheckedException {
+            onMessage0(clusterNode.id(), msg);
+        }
+    };
+
+    /**
+     * @param tbl Table.
+     */
+    protected final void initDistributedJoinMessaging(GridH2Table tbl) {
+        final GridH2RowDescriptor desc = tbl.rowDescriptor();
+
+        if (desc != null && desc.context() != null) {
+            GridKernalContext ctx = desc.context().kernalContext();
+
+            log = ctx.log(getClass());
+
+            msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifier() + '.' + getName());
+
+            msgLsnr = new GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object msg) {
+                    GridSpinBusyLock l = desc.indexing().busyLock();
+
+                    if (!l.enterBusy())
+                        return;
+
+                    try {
+                        onMessage0(nodeId, msg);
+                    }
+                    finally {
+                        l.leaveBusy();
+                    }
+                }
+            };
+
+            ctx.io().addMessageListener(msgTopic, msgLsnr);
+        }
+        else {
+            msgTopic = null;
+            msgLsnr = null;
+            log = new NullLogger();
+        }
+    }
+
     /** {@inheritDoc} */
-    @Override public final void close(Session session) {
+    @Override public final void close(Session ses) {
         // No-op. Actual index destruction must happen in method destroy.
     }
 
@@ -64,7 +171,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * We use this method instead of {@link #close(Session)} because that method
      * is used by H2 internally.
      */
-    public abstract void destroy();
+    public void destroy() {
+        if (msgLsnr != null)
+            kernalContext().io().removeMessageListener(msgTopic, msgLsnr);
+    }
 
     /**
      * If the index supports rebuilding it has to creates its own copy.
@@ -122,10 +232,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param ses Session.
      */
     private static void clearViewIndexCache(Session ses) {
-        Map<Object,ViewIndex> viewIndexCache = ses.getViewIndexCache(true);
+        Map<Object,ViewIndex> viewIdxCache = ses.getViewIndexCache(true);
 
-        if (!viewIndexCache.isEmpty())
-            viewIndexCache.clear();
+        if (!viewIdxCache.isEmpty())
+            viewIdxCache.clear();
     }
 
     /**
@@ -248,6 +358,1110 @@ public abstract class GridH2IndexBase extends BaseIndex {
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
+        GridH2QueryContext qctx = GridH2QueryContext.get();
+
+        if (qctx == null || !qctx.distributedJoins() || !getTable().isPartitioned())
+            return null;
+
+        IndexColumn affCol = getTable().getAffinityKeyColumn();
+
+        int affColId;
+        boolean ucast;
+
+        if (affCol != null) {
+            affColId = affCol.column.getColumnId();
+            int[] masks = filter.getMasks();
+            ucast = masks != null && masks[affColId] == IndexCondition.EQUALITY;
+        }
+        else {
+            affColId = -1;
+            ucast = false;
+        }
+
+        GridCacheContext<?,?> cctx = getTable().rowDescriptor().context();
+
+        return new DistributedLookupBatch(cctx, ucast, affColId);
+    }
+
+    /**
+     * @param nodes Nodes.
+     * @param msg Message.
+     */
+    private void send(Collection<ClusterNode> nodes, Message msg) {
+        if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHnd,
+            GridIoPolicy.IDX_POOL, false))
+            throw new GridH2RetryException("Failed to send message to nodes: " + nodes + ".");
+    }
+
+    /**
+     * @param nodeId Source node ID.
+     * @param msg Message.
+     */
+    private void onMessage0(UUID nodeId, Object msg) {
+        ClusterNode node = kernalContext().discovery().node(nodeId);
+
+        if (node == null)
+            return;
+
+        try {
+            if (msg instanceof GridH2IndexRangeRequest)
+                onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
+            else if (msg instanceof GridH2IndexRangeResponse)
+                onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
+        }
+        catch (Throwable th) {
+            U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
+
+            if (th instanceof Error)
+                throw th;
+        }
+    }
+
+    /**
+     * @return Kernal context.
+     */
+    private GridKernalContext kernalContext() {
+        return getTable().rowDescriptor().context().kernalContext();
+    }
+
+    /**
+     * @param node Requesting node.
+     * @param msg Request message.
+     */
+    private void onIndexRangeRequest(ClusterNode node, GridH2IndexRangeRequest msg) {
+        GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
+            msg.originNodeId(),
+            msg.queryId(),
+            MAP);
+
+        GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
+
+        res.originNodeId(msg.originNodeId());
+        res.queryId(msg.queryId());
+        res.batchLookupId(msg.batchLookupId());
+
+        if (qctx == null)
+            res.status(STATUS_NOT_FOUND);
+        else {
+            try {
+                RangeSource src;
+
+                if (msg.bounds() != null) {
+                    // This is the first request containing all the search rows.
+                    ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> snapshot0 = qctx.getSnapshot(idxId);
+
+                    assert !msg.bounds().isEmpty() : "empty bounds";
+
+                    src = new RangeSource(msg.bounds(), snapshot0, qctx.filter());
+                }
+                else {
+                    // This is request to fetch next portion of data.
+                    src = qctx.getSource(node.id(), msg.batchLookupId());
+
+                    assert src != null;
+                }
+
+                List<GridH2RowRange> ranges = new ArrayList<>();
+
+                int maxRows = qctx.pageSize();
+
+                assert maxRows > 0 : maxRows;
+
+                while (maxRows > 0) {
+                    GridH2RowRange range = src.next(maxRows);
+
+                    if (range == null)
+                        break;
+
+                    ranges.add(range);
+
+                    if (range.rows() != null)
+                        maxRows -= range.rows().size();
+                }
+
+                if (src.hasMoreRows()) {
+                    // Save source for future fetches.
+                    if (msg.bounds() != null)
+                        qctx.putSource(node.id(), msg.batchLookupId(), src);
+                }
+                else if (msg.bounds() == null) {
+                    // Drop saved source.
+                    qctx.putSource(node.id(), msg.batchLookupId(), null);
+                }
+
+                assert !ranges.isEmpty();
+
+                res.ranges(ranges);
+                res.status(STATUS_OK);
+            }
+            catch (Throwable th) {
+                U.error(log, "Failed to process request: " + msg, th);
+
+                res.error(th.getClass() + ": " + th.getMessage());
+                res.status(STATUS_ERROR);
+            }
+        }
+
+        send(singletonList(node), res);
+    }
+
+    /**
+     * @param node Responded node.
+     * @param msg Response message.
+     */
+    private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
+        GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
+            msg.originNodeId(), msg.queryId(), MAP);
+
+        if (qctx == null)
+            return;
+
+        Map<ClusterNode, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
+
+        if (streams == null)
+            return;
+
+        RangeStream stream = streams.get(node);
+
+        assert stream != null;
+
+        stream.onResponse(msg);
+    }
+
+    /**
+     * @param v1 First value.
+     * @param v2 Second value.
+     * @return {@code true} If they equal.
+     */
+    private boolean equal(Value v1, Value v2) {
+        return v1 == v2 || (v1 != null && v2 != null && v1.compareTypeSafe(v2, getDatabase().getCompareMode()) == 0);
+    }
+
+    /**
+     * @param qctx Query context.
+     * @param batchLookupId Batch lookup ID.
+     * @return Index range request.
+     */
+    private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId) {
+        GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
+
+        req.originNodeId(qctx.originNodeId());
+        req.queryId(qctx.queryId());
+        req.batchLookupId(batchLookupId);
+
+        return req;
+    }
+
+    /**
+     * @param qctx Query context.
+     * @param cctx Cache context.
+     * @return Collection of nodes for broadcasting.
+     */
+    private List<ClusterNode> broadcastNodes(GridH2QueryContext qctx, GridCacheContext<?,?> cctx) {
+        Map<UUID, int[]> partMap = qctx.partitionsMap();
+
+        List<ClusterNode> res;
+
+        if (partMap == null)
+            res = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+        else {
+            res = new ArrayList<>(partMap.size());
+
+            GridKernalContext ctx = kernalContext();
+
+            for (UUID nodeId : partMap.keySet()) {
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (node == null)
+                    throw new GridH2RetryException("Failed to find node.");
+
+                res.add(node);
+            }
+        }
+
+        if (F.isEmpty(res))
+            throw new GridH2RetryException("Failed to collect affinity nodes.");
+
+        return res;
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param qctx Query context.
+     * @param affKeyObj Affinity key.
+     * @return Cluster nodes or {@code null} if affinity key is a null value.
+     */
+    private ClusterNode rangeNode(GridCacheContext<?,?> cctx, GridH2QueryContext qctx, Object affKeyObj) {
+        assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
+
+        ClusterNode node;
+
+        if (qctx.partitionsMap() != null) {
+            // If we have explicit partitions map, we have to use it to calculate affinity node.
+            UUID nodeId = qctx.nodeForPartition(cctx.affinity().partition(affKeyObj), cctx);
+
+            node = cctx.discovery().node(nodeId);
+        }
+        else // Get primary node for current topology version.
+            node = cctx.affinity().primary(affKeyObj, qctx.topologyVersion());
+
+        if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
+            throw new GridH2RetryException("Failed to find node.");
+
+        return node;
+    }
+
+    /**
+     * @param row Row.
+     * @return Row message.
+     */
+    private GridH2RowMessage toRowMessage(Row row) {
+        if (row == null)
+            return null;
+
+        int cols = row.getColumnCount();
+
+        assert cols > 0 : cols;
+
+        List<GridH2ValueMessage> vals = new ArrayList<>(cols);
+
+        for (int i = 0; i < cols; i++) {
+            try {
+                vals.add(GridH2ValueMessageFactory.toMessage(row.getValue(i)));
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        GridH2RowMessage res = new GridH2RowMessage();
+
+        res.values(vals);
+
+        return res;
+    }
+
+    /**
+     * @param msg Row message.
+     * @return Search row.
+     */
+    private SearchRow toSearchRow(GridH2RowMessage msg) {
+        if (msg == null)
+            return null;
+
+        GridKernalContext ctx = kernalContext();
+
+        Value[] vals = new Value[getTable().getColumns().length];
+
+        assert vals.length > 0;
+
+        List<GridH2ValueMessage> msgVals = msg.values();
+
+        for (int i = 0; i < indexColumns.length; i++) {
+            if (i >= msgVals.size())
+                continue;
+
+            try {
+                vals[indexColumns[i].column.getColumnId()] = msgVals.get(i).value(ctx);
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        return database.createRow(vals, MEMORY_CALCULATE);
+    }
+
+    /**
+     * @param row Search row.
+     * @return Row message.
+     */
+    private GridH2RowMessage toSearchRowMessage(SearchRow row) {
+        if (row == null)
+            return null;
+
+        List<GridH2ValueMessage> vals = new ArrayList<>(indexColumns.length);
+
+        for (IndexColumn idxCol : indexColumns) {
+            Value val = row.getValue(idxCol.column.getColumnId());
+
+            if (val == null)
+                break;
+
+            try {
+                vals.add(GridH2ValueMessageFactory.toMessage(val));
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        GridH2RowMessage res = new GridH2RowMessage();
+
+        res.values(vals);
+
+        return res;
+    }
+
+    /**
+     * @param msg Message.
+     * @return Row.
+     */
+    private Row toRow(GridH2RowMessage msg) {
+        if (msg == null)
+            return null;
+
+        GridKernalContext ctx = kernalContext();
+
+        List<GridH2ValueMessage> vals = msg.values();
+
+        assert !F.isEmpty(vals) : vals;
+
+        Value[] vals0 = new Value[vals.size()];
+
+        for (int i = 0; i < vals0.length; i++) {
+            try {
+                vals0[i] = vals.get(i).value(ctx);
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+        }
+
+        return database.createRow(vals0, MEMORY_CALCULATE);
+    }
+
+    /**
+     * Simple cursor from a single node.
+     */
+    private static class UnicastCursor implements Cursor {
+        /** */
+        final int rangeId;
+
+        /** */
+        RangeStream stream;
+
+        /**
+         * @param rangeId Range ID.
+         * @param nodes Remote nodes.
+         * @param rangeStreams Range streams.
+         */
+        private UnicastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
+            assert nodes.size() == 1;
+
+            this.rangeId = rangeId;
+            this.stream = rangeStreams.get(F.first(nodes));
+
+            assert stream != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            return stream.next(rangeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Row get() {
+            return stream.get(rangeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public SearchRow getSearchRow() {
+            return get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean previous() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * Merge cursor from multiple nodes.
+     */
+    private class BroadcastCursor implements Cursor, Comparator<RangeStream> {
+        /** */
+        final int rangeId;
+
+        /** */
+        final RangeStream[] streams;
+
+        /** */
+        boolean first = true;
+
+        /** */
+        int off;
+
+        /**
+         * @param rangeId Range ID.
+         * @param nodes Remote nodes.
+         * @param rangeStreams Range streams.
+         */
+        private BroadcastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
+            assert nodes.size() > 1;
+
+            this.rangeId = rangeId;
+
+            streams = new RangeStream[nodes.size()];
+
+            int i = 0;
+
+            for (ClusterNode node : nodes) {
+                RangeStream stream = rangeStreams.get(node);
+
+                assert stream != null;
+
+                streams[i++] = stream;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compare(RangeStream o1, RangeStream o2) {
+            if (o1 == o2)
+                return 0;
+
+            // Nulls are at the beginning of array.
+            if (o1 == null)
+                return -1;
+
+            if (o2 == null)
+                return 1;
+
+            return compareRows(o1.get(rangeId), o2.get(rangeId));
+        }
+
+        /**
+         * Try to fetch the first row.
+         *
+         * @return {@code true} If we were able to find at least one row.
+         */
+        private boolean goFirst() {
+            // Fetch first row from all the streams and sort them.
+            for (int i = 0; i < streams.length; i++) {
+                if (!streams[i].next(rangeId)) {
+                    streams[i] = null;
+                    off++; // After sorting this offset will cut off all null elements at the beginning of array.
+                }
+            }
+
+            if (off == streams.length)
+                return false;
+
+            Arrays.sort(streams, this);
+
+            return true;
+        }
+
+        /**
+         * Fetch next row.
+         *
+         * @return {@code true} If we were able to find at least one row.
+         */
+        private boolean goNext() {
+            assert off != streams.length;
+
+            if (!streams[off].next(rangeId)) {
+                // Next row from current min stream was not found -> nullify that stream and bump offset forward.
+                streams[off] = null;
+
+                return ++off != streams.length;
+            }
+
+            // Bubble up current min stream with respect to fetched row to achieve correct sort order of streams.
+            for (int i = off, last = streams.length - 1; i < last; i++) {
+                if (compareRows(streams[i].get(rangeId), streams[i + 1].get(rangeId)) <= 0)
+                    break;
+
+                U.swap(streams, i, i + 1);
+            }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (first) {
+                first = false;
+
+                return goFirst();
+            }
+
+            return goNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Row get() {
+            return streams[off].get(rangeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public SearchRow getSearchRow() {
+            return get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean previous() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * Index lookup batch.
+     */
+    private class DistributedLookupBatch implements IndexLookupBatch {
+        /** */
+        final GridCacheContext<?,?> cctx;
+
+        /** */
+        final boolean ucast;
+
+        /** */
+        final int affColId;
+
+        /** */
+        GridH2QueryContext qctx;
+
+        /** */
+        int batchLookupId;
+
+        /** */
+        Map<ClusterNode, RangeStream> rangeStreams;
+
+        /** */
+        List<ClusterNode> broadcastNodes;
+
+        /** */
+        final List<Future<Cursor>> res = new ArrayList<>();
+
+        /** */
+        boolean batchFull;
+
+        /** */
+        boolean findCalled;
+
+        /**
+         * @param cctx Cache Cache context.
+         * @param ucast Unicast or broadcast query.
+         * @param affColId Affinity column ID.
+         */
+        private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean ucast, int affColId) {
+            this.cctx = cctx;
+            this.ucast = ucast;
+            this.affColId = affColId;
+        }
+
+        /**
+         * @param firstRow First row.
+         * @param lastRow Last row.
+         * @return Affinity key or {@code null}.
+         */
+        private Object getAffinityKey(SearchRow firstRow, SearchRow lastRow) {
+            if (firstRow == null || lastRow == null)
+                return null;
+
+            Value affKeyFirst = firstRow.getValue(affColId);
+            Value affKeyLast = lastRow.getValue(affColId);
+
+            if (affKeyFirst != null && equal(affKeyFirst, affKeyLast))
+                return affKeyFirst == ValueNull.INSTANCE ? EXPLICIT_NULL : affKeyFirst.getObject();
+
+            if (affColId == KEY_COL)
+                return null;
+
+            // Try to extract affinity key from primary key.
+            Value pkFirst = firstRow.getValue(KEY_COL);
+            Value pkLast = lastRow.getValue(KEY_COL);
+
+            if (pkFirst == ValueNull.INSTANCE || pkLast == ValueNull.INSTANCE)
+                return EXPLICIT_NULL;
+
+            if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
+                return null;
+
+            Object pkAffKeyFirst;
+            Object pkAffKeyLast;
+
+            GridKernalContext ctx = kernalContext();
+
+            try {
+                pkAffKeyFirst = ctx.affinity().affinityKey(cctx.name(), pkFirst.getObject());
+                pkAffKeyLast = ctx.affinity().affinityKey(cctx.name(), pkLast.getObject());
+            }
+            catch (IgniteCheckedException e) {
+                throw new CacheException(e);
+            }
+
+            if (pkAffKeyFirst == null || pkAffKeyLast == null)
+                throw new CacheException("Cache key without affinity key.");
+
+            if (pkAffKeyFirst.equals(pkAffKeyLast))
+                return pkAffKeyFirst;
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("ForLoopReplaceableByForEach")
+        @Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) {
+            if (findCalled) {
+                findCalled = false;
+
+                // Cleanup after the previous phase.
+                qctx.putStreams(batchLookupId, null);
+
+                // Reinitialize for the next lookup phase.
+                batchLookupId = qctx.nextBatchLookupId();
+                rangeStreams = new HashMap<>();
+                res.clear();
+            }
+
+            Object affKey = affColId == -1 ? null : getAffinityKey(firstRow, lastRow);
+
+            List<ClusterNode> nodes;
+            Future<Cursor> fut;
+
+            if (affKey != null) {
+                // Affinity key is provided.
+                if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
+                    return false;
+
+                nodes = F.asList(rangeNode(cctx, qctx, affKey));
+            }
+            else {
+                // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast.
+                if (broadcastNodes == null)
+                    broadcastNodes = broadcastNodes(qctx, cctx);
+
+                nodes = broadcastNodes;
+            }
+
+            assert !F.isEmpty(nodes) : nodes;
+
+            final int rangeId = res.size();
+
+            // Create messages.
+            GridH2RowMessage first = toSearchRowMessage(firstRow);
+            GridH2RowMessage last = toSearchRowMessage(lastRow);
+
+            // Range containing upper and lower bounds.
+            GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last);
+
+            // Add range to every message of every participating node.
+            for (int i = 0; i < nodes.size(); i++) {
+                ClusterNode node = nodes.get(i);
+                assert node != null;
+
+                RangeStream stream = rangeStreams.get(node);
+
+                List<GridH2RowRangeBounds> bounds;
+
+                if (stream == null) {
+                    stream = new RangeStream(qctx, node);
+
+                    stream.req = createRequest(qctx, batchLookupId);
+                    stream.req.bounds(bounds = new ArrayList<>());
+
+                    rangeStreams.put(node, stream);
+                }
+                else
+                    bounds = stream.req.bounds();
+
+                bounds.add(rangeBounds);
+
+                // If at least one node will have a full batch then we are ok.
+                if (bounds.size() >= qctx.pageSize())
+                    batchFull = true;
+            }
+
+            fut = new DoneFuture<>(nodes.size() == 1 ?
+                new UnicastCursor(rangeId, nodes, rangeStreams) :
+                new BroadcastCursor(rangeId, nodes, rangeStreams));
+
+            res.add(fut);
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isBatchFull() {
+            return batchFull;
+        }
+
+        /**
+         *
+         */
+        private void startStreams() {
+            if (rangeStreams.isEmpty()) {
+                assert res.isEmpty();
+
+                return;
+            }
+
+            qctx.putStreams(batchLookupId, rangeStreams);
+
+            // Start streaming.
+            for (RangeStream stream : rangeStreams.values())
+                stream.start();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<Future<Cursor>> find() {
+            batchFull = false;
+            findCalled = true;
+
+            startStreams();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset(boolean beforeQry) {
+            if (beforeQry) {
+                qctx = GridH2QueryContext.get();
+                batchLookupId = qctx.nextBatchLookupId();
+                rangeStreams = new HashMap<>();
+            }
+            else {
+                rangeStreams = null;
+                broadcastNodes = null;
+                batchFull = false;
+                findCalled = false;
+                res.clear();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String getPlanSQL() {
+            return ucast ? "unicast" : "broadcast";
+        }
+    }
+
+    /**
+     * Per node range stream.
+     */
+    private class RangeStream {
+        /** */
+        final GridH2QueryContext qctx;
+
+        /** */
+        final ClusterNode node;
+
+        /** */
+        GridH2IndexRangeRequest req;
+
+        /** */
+        int remainingRanges;
+
+        /** */
+        final BlockingQueue<GridH2IndexRangeResponse> respQueue = new LinkedBlockingQueue<>();
+
+        /** */
+        Iterator<GridH2RowRange> ranges = emptyIterator();
+
+        /** */
+        Cursor cursor = GridH2Cursor.EMPTY;
+
+        /** */
+        int cursorRangeId = -1;
+
+        /**
+         * @param qctx Query context.
+         * @param node Node.
+         */
+        RangeStream(GridH2QueryContext qctx, ClusterNode node) {
+            this.node = node;
+            this.qctx = qctx;
+        }
+
+        /**
+         * Start streaming.
+         */
+        private void start() {
+            remainingRanges = req.bounds().size();
+
+            assert remainingRanges > 0;
+
+            if (log.isDebugEnabled())
+                log.debug("Starting stream: [node=" + node + ", req=" + req + "]");
+
+            send(singletonList(node), req);
+        }
+
+        /**
+         * @param msg Response.
+         */
+        public void onResponse(GridH2IndexRangeResponse msg) {
+            respQueue.add(msg);
+        }
+
+        /**
+         * @return Response.
+         */
+        private GridH2IndexRangeResponse awaitForResponse() {
+            assert remainingRanges > 0;
+
+            final long start = U.currentTimeMillis();
+
+            for (int attempt = 0;; attempt++) {
+                if (qctx.isCleared())
+                    throw new GridH2RetryException("Query is cancelled.");
+
+                if (kernalContext().isStopping())
+                    throw new GridH2RetryException("Stopping node.");
+
+                GridH2IndexRangeResponse res;
+
+                try {
+                    res = respQueue.poll(500, TimeUnit.MILLISECONDS);
+                }
+                catch (InterruptedException e) {
+                    throw new GridH2RetryException("Interrupted.");
+                }
+
+                if (res != null) {
+                    switch (res.status()) {
+                        case STATUS_OK:
+                            List<GridH2RowRange> ranges0 = res.ranges();
+
+                            remainingRanges -= ranges0.size();
+
+                            if (ranges0.get(ranges0.size() - 1).isPartial())
+                                remainingRanges++;
+
+                            if (remainingRanges > 0) {
+                                if (req.bounds() != null)
+                                    req = createRequest(qctx, req.batchLookupId());
+
+                                // Prefetch next page.
+                                send(singletonList(node), req);
+                            }
+                            else
+                                req = null;
+
+                            return res;
+
+                        case STATUS_NOT_FOUND:
+                            if (req == null || req.bounds() == null) // We have already received the first response.
+                                throw new GridH2RetryException("Failure on remote node.");
+
+                            if (U.currentTimeMillis() - start > 30_000)
+                                throw new GridH2RetryException("Timeout.");
+
+                            try {
+                                U.sleep(20 * attempt);
+                            }
+                            catch (IgniteInterruptedCheckedException e) {
+                                throw new IgniteInterruptedException(e.getMessage());
+                            }
+
+                            // Retry to send the request once more after some time.
+                            send(singletonList(node), req);
+
+                            break;
+
+                        case STATUS_ERROR:
+                            throw new CacheException(res.error());
+
+                        default:
+                            throw new IllegalStateException();
+                    }
+                }
+
+                if (!kernalContext().discovery().alive(node))
+                    throw new GridH2RetryException("Node left: " + node);
+            }
+        }
+
+        /**
+         * @param rangeId Requested range ID.
+         * @return {@code true} If next row for the requested range was found.
+         */
+        private boolean next(final int rangeId) {
+            for (;;) {
+                if (rangeId == cursorRangeId) {
+                    if (cursor.next())
+                        return true;
+                }
+                else if (rangeId < cursorRangeId)
+                    return false;
+
+                cursor = GridH2Cursor.EMPTY;
+
+                while (!ranges.hasNext()) {
+                    if (remainingRanges == 0) {
+                        ranges = emptyIterator();
+
+                        return false;
+                    }
+
+                    ranges = awaitForResponse().ranges().iterator();
+                }
+
+                GridH2RowRange range = ranges.next();
+
+                cursorRangeId = range.rangeId();
+
+                if (!F.isEmpty(range.rows())) {
+                    final Iterator<GridH2RowMessage> it = range.rows().iterator();
+
+                    if (it.hasNext()) {
+                        cursor = new GridH2Cursor(new Iterator<Row>() {
+                            @Override public boolean hasNext() {
+                                return it.hasNext();
+                            }
+
+                            @Override public Row next() {
+                                // Lazily convert messages into real rows.
+                                return toRow(it.next());
+                            }
+
+                            @Override public void remove() {
+                                throw new UnsupportedOperationException();
+                            }
+                        });
+                    }
+                }
+            }
+        }
+
+        /**
+         * @param rangeId Requested range ID.
+         * @return Current row.
+         */
+        private Row get(int rangeId) {
+            assert rangeId == cursorRangeId;
+
+            return cursor.get();
+        }
+    }
+
+    /**
+     * Bounds iterator.
+     */
+    private class RangeSource {
+        /** */
+        Iterator<GridH2RowRangeBounds> boundsIter;
+
+        /** */
+        int curRangeId = -1;
+
+        /** */
+        Iterator<GridH2Row> curRange = emptyIterator();
+
+        /** */
+        final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
+
+        /** */
+        final IndexingQueryFilter filter;
+
+        /**
+         * @param bounds Bounds.
+         * @param tree Snapshot.
+         * @param filter Filter.
+         */
+        RangeSource(
+            Iterable<GridH2RowRangeBounds> bounds,
+            ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree,
+            IndexingQueryFilter filter
+        ) {
+            this.filter = filter;
+            this.tree = tree;
+            boundsIter = bounds.iterator();
+        }
+
+        /**
+         * @return {@code true} If there are more rows in this source.
+         */
+        public boolean hasMoreRows() {
+            return boundsIter.hasNext() || curRange.hasNext();
+        }
+
+        /**
+         * @param maxRows Max allowed rows.
+         * @return Range.
+         */
+        public GridH2RowRange next(int maxRows) {
+            assert maxRows > 0 : maxRows;
+
+            for (;;) {
+                if (curRange.hasNext()) {
+                    // Here we are getting last rows from previously partially fetched range.
+                    List<GridH2RowMessage> rows = new ArrayList<>();
+
+                    GridH2RowRange nextRange = new GridH2RowRange();
+
+                    nextRange.rangeId(curRangeId);
+                    nextRange.rows(rows);
+
+                    do {
+                        rows.add(toRowMessage(curRange.next()));
+                    }
+                    while (rows.size() < maxRows && curRange.hasNext());
+
+                    if (curRange.hasNext())
+                        nextRange.setPartial();
+                    else
+                        curRange = emptyIterator();
+
+                    return nextRange;
+                }
+
+                curRange = emptyIterator();
+
+                if (!boundsIter.hasNext()) {
+                    boundsIter = emptyIterator();
+
+                    return null;
+                }
+
+                GridH2RowRangeBounds bounds = boundsIter.next();
+
+                curRangeId = bounds.rangeId();
+
+                SearchRow first = toSearchRow(bounds.first());
+                SearchRow last = toSearchRow(bounds.last());
+
+                ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> t = tree != null ? tree : treeForRead();
+
+                curRange = doFind0(t, first, true, last, filter);
+
+                if (!curRange.hasNext()) {
+                    // We have to return empty range here.
+                    GridH2RowRange emptyRange = new GridH2RowRange();
+
+                    emptyRange.rangeId(curRangeId);
+
+                    return emptyRange;
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Snapshot for current thread if there is one.
+     */
+    protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @param t Tree.
+     * @param first Lower bound.
+     * @param includeFirst Whether lower bound should be inclusive.
+     * @param last Upper bound always inclusive.
+     * @param filter Filter.
+     * @return Iterator over rows in given range.
+     */
+    protected Iterator<GridH2Row> doFind0(ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t,
+        @Nullable SearchRow first,
+        boolean includeFirst,
+        @Nullable SearchRow last,
+        IndexingQueryFilter filter) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Iterator which filters by expiration time and predicate.
      */
@@ -267,8 +1481,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
          * @param qryFilter Filter.
          * @param spaceName Space name.
          */
-        protected FilteringIterator(Iterator<GridH2Row> iter, long time,
-            IndexingQueryFilter qryFilter, String spaceName) {
+        protected FilteringIterator(Iterator<GridH2Row> iter,
+            long time,
+            IndexingQueryFilter qryFilter,
+            String spaceName) {
             super(iter);
 
             this.time = time;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e48c1d49/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index d240c40..53c2ed6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -113,28 +113,30 @@ public class GridH2Table extends TableBase {
         this.desc = desc;
         this.spaceName = spaceName;
 
-        boolean affinityColExists = true;
-
         if (desc != null && desc.context() != null) {
-            String affKey = desc.type().affinityKey();
+            if (!desc.context().customAffinityMapper()) {
+                boolean affinityColExists = true;
+
+                String affKey = desc.type().affinityKey();
 
-            int affKeyColId = -1;
+                int affKeyColId = -1;
 
-            if (affKey != null) {
-                String colName = desc.context().config().isSqlEscapeAll() ? affKey : affKey.toUpperCase();
+                if (affKey != null) {
+                    String colName = desc.context().config().isSqlEscapeAll() ? affKey : affKey.toUpperCase();
 
-                if (doesColumnExist(colName))
-                    affKeyColId = getColumn(colName).getColumnId();
+                    if (doesColumnExist(colName))
+                        affKeyColId = getColumn(colName).getColumnId();
+                    else
+                        affinityColExists = false;
+                }
                 else
-                    affinityColExists = false;
-            }
-            else
-                affKeyColId = KEY_COL;
+                    affKeyColId = KEY_COL;
 
-            if (affinityColExists) {
-                affKeyCol = indexColumn(affKeyColId, SortOrder.ASCENDING);
+                if (affinityColExists) {
+                    affKeyCol = indexColumn(affKeyColId, SortOrder.ASCENDING);
 
-                assert affKeyCol != null;
+                    assert affKeyCol != null;
+                }
             }
         }
 
@@ -909,7 +911,7 @@ public class GridH2Table extends TableBase {
 
         /** {@inheritDoc} */
         @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) {
-            return delegate.find(filter, first, last);
+            return find(filter.getSession(), first, last);
         }
 
         /** {@inheritDoc} */


Mime
View raw message