ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [01/12] ignite git commit: Implemented.
Date Wed, 22 Feb 2017 10:18:06 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1.9 963a9d9fe -> 658b4ad5a


http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 914e0da..0829df0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
+import java.lang.reflect.Field;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.GridEmptyIterator;
 import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
@@ -43,18 +46,36 @@ import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Base class for snapshotable tree indexes.
+ * Base class for snapshotable segmented tree indexes.
  */
 @SuppressWarnings("ComparatorNotSerializable")
 public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
+
     /** */
-    private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
+    private static Field KEY_FIELD;
+
+    /** */
+    static {
+        try {
+            KEY_FIELD = GridH2AbstractKeyValueRow.class.getDeclaredField("key");
+            KEY_FIELD.setAccessible(true);
+        }
+        catch (NoSuchFieldException e) {
+            KEY_FIELD = null;
+        }
+    }
+
+    /** Index segments. */
+    private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>[] segments;
 
     /** */
     private final boolean snapshotEnabled;
 
+    /** */
+    private final GridH2RowDescriptor desc;
+
     /**
-     * Constructor with index initialization.
+     * Constructor with index initialization. Creates index with single segment.
      *
      * @param name Index name.
      * @param tbl Table.
@@ -63,35 +84,59 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
      */
     @SuppressWarnings("unchecked")
     public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList) {
+        this(name, tbl, pk, colsList, 1);
+    }
+
+    /**
+     * Constructor with index initialization.
+     *
+     * @param name Index name.
+     * @param tbl Table.
+     * @param pk If this index is primary key.
+     * @param colsList Index columns list.
+     * @param segmentsCnt Number of segments.
+     */
+    @SuppressWarnings("unchecked")
+    public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList, int segmentsCnt) {
+        assert segmentsCnt > 0 : segmentsCnt;
+
         IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
 
         IndexColumn.mapColumns(cols, tbl);
 
+        desc = tbl.rowDescriptor();
+
         initBaseIndex(tbl, 0, name, cols,
             pk ? IndexType.createPrimaryKey(false, false) : IndexType.createNonUnique(false, false, false));
 
+        segments = new ConcurrentNavigableMap[segmentsCnt];
+
         final GridH2RowDescriptor desc = tbl.rowDescriptor();
 
         if (desc == null || desc.memory() == null) {
             snapshotEnabled = desc == null || desc.snapshotableIndex();
 
             if (snapshotEnabled) {
-                tree = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
-                    @Override protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
-                        if (val != null)
-                            node.key = (GridSearchRowPointer)val;
-                    }
+                for (int i = 0; i < segmentsCnt; i++) {
+                    segments[i] = new SnapTreeMap<GridSearchRowPointer, GridH2Row>(this) {
+                        @Override
+                        protected void afterNodeUpdate_nl(Node<GridSearchRowPointer, GridH2Row> node, Object val) {
+                            if (val != null)
+                                node.key = (GridSearchRowPointer)val;
+                        }
 
-                    @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
-                        if (key instanceof ComparableRow)
-                            return (Comparable<? super SearchRow>)key;
+                        @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
+                            if (key instanceof ComparableRow)
+                                return (Comparable<? super SearchRow>)key;
 
-                        return super.comparable(key);
-                    }
-                };
+                            return super.comparable(key);
+                        }
+                    };
+                }
             }
             else {
-                tree = new ConcurrentSkipListMap<>(
+                for (int i = 0; i < segmentsCnt; i++) {
+                    segments[i] = new ConcurrentSkipListMap<>(
                         new Comparator<GridSearchRowPointer>() {
                             @Override public int compare(GridSearchRowPointer o1, GridSearchRowPointer o2) {
                                 if (o1 instanceof ComparableRow)
@@ -103,7 +148,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
                                 return compareRows(o1, o2);
                             }
                         }
-                );
+                    );
+                }
             }
         }
         else {
@@ -111,28 +157,30 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
             snapshotEnabled = true;
 
-            tree = new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) {
-                @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) {
-                    final long oldKey = keyPtr(node);
+            for (int i = 0; i < segmentsCnt; i++) {
+                segments[i] = new GridOffHeapSnapTreeMap<GridSearchRowPointer, GridH2Row>(desc, desc, desc.memory(), desc.guard(), this) {
+                    @Override protected void afterNodeUpdate_nl(long node, GridH2Row val) {
+                        final long oldKey = keyPtr(node);
 
-                    if (val != null) {
-                        key(node, val);
+                        if (val != null) {
+                            key(node, val);
 
-                        guard.finalizeLater(new Runnable() {
-                            @Override public void run() {
-                                desc.createPointer(oldKey).decrementRefCount();
-                            }
-                        });
+                            guard.finalizeLater(new Runnable() {
+                                @Override public void run() {
+                                    desc.createPointer(oldKey).decrementRefCount();
+                                }
+                            });
+                        }
                     }
-                }
 
-                @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
-                    if (key instanceof ComparableRow)
-                        return (Comparable<? super SearchRow>)key;
+                    @Override protected Comparable<? super GridSearchRowPointer> comparable(Object key) {
+                        if (key instanceof ComparableRow)
+                            return (Comparable<? super SearchRow>)key;
 
-                    return super.comparable(key);
-                }
-            };
+                        return super.comparable(key);
+                    }
+                };
+            }
         }
 
         initDistributedJoinMessaging(tbl);
@@ -142,20 +190,24 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     @Override protected Object doTakeSnapshot() {
         assert snapshotEnabled;
 
+        int seg = threadLocalSegment();
+
+        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree = segments[seg];
+
         return tree instanceof SnapTreeMap ?
             ((SnapTreeMap)tree).clone() :
             ((GridOffHeapSnapTreeMap)tree).clone();
     }
 
     /** {@inheritDoc} */
-    protected final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+    protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead(int seg) {
         if (!snapshotEnabled)
-            return tree;
+            return segments[seg];
 
         ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> res = threadLocalSnapshot();
 
         if (res == null)
-            res = tree;
+            return segments[seg];
 
         return res;
     }
@@ -164,19 +216,37 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     @Override public void destroy() {
         assert threadLocalSnapshot() == null;
 
-        if (tree instanceof AutoCloseable)
-            U.closeQuiet((AutoCloseable)tree);
+        for (int i = 0; i < segments.length; i++) {
+            if (segments[i] instanceof AutoCloseable)
+                U.closeQuiet((AutoCloseable)segments[i]);
+        }
 
         super.destroy();
     }
 
+
+    /** {@inheritDoc} */
+    protected int threadLocalSegment() {
+        GridH2QueryContext qctx = GridH2QueryContext.get();
+
+        if(segments.length == 1)
+            return 0;
+
+        if(qctx == null)
+            throw new IllegalStateException("GridH2QueryContext is not initialized.");
+
+        return qctx.segment();
+    }
+
     /** {@inheritDoc} */
     @Override public long getRowCount(@Nullable Session ses) {
         IndexingQueryFilter f = threadLocalFilter();
 
+        int seg = threadLocalSegment();
+
         // Fast path if we don't need to perform any filtering.
         if (f == null || f.forSpace((getTable()).spaceName()) == null)
-            return treeForRead().size();
+            return treeForRead(seg).size();
 
         Iterator<GridH2Row> iter = doFind(null, false, null);
 
@@ -255,7 +325,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
      * @return Row.
      */
     public GridH2Row findOne(GridSearchRowPointer row) {
-        return tree.get(row);
+        int seg = threadLocalSegment();
+
+        return segments[seg].get(row);
     }
 
     /**
@@ -268,7 +340,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
      */
     @SuppressWarnings("unchecked")
     private Iterator<GridH2Row> doFind(@Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last) {
-        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead();
+        int seg = threadLocalSegment();
+
+        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead(seg);
 
         return doFind0(t, first, includeFirst, last, threadLocalFilter());
     }
@@ -359,12 +433,68 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
     /** {@inheritDoc} */
     @Override public GridH2Row put(GridH2Row row) {
-        return tree.put(row, row);
+        int seg = segment(row);
+
+        return segments[seg].put(row, row);
     }
 
     /** {@inheritDoc} */
     @Override public GridH2Row remove(SearchRow row) {
-        return tree.remove(comparable(row, 0));
+        GridSearchRowPointer comparable = comparable(row, 0);
+
+        int seg = segment(row);
+
+        return segments[seg].remove(comparable);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int segmentsCount() {
+        return segments.length;
+    }
+
+    /**
+     * @param partition Parttition idx.
+     * @return index currentSegment Id for given key
+     */
+    protected int segment(int partition) {
+        return partition % segments.length;
+    }
+
+    /**
+     * @param row
+     * @return index currentSegment Id for given row
+     */
+    private int segment(SearchRow row) {
+        assert row != null;
+
+        CacheObject key;
+
+        if (desc != null && desc.context() != null) {
+            GridCacheContext<?, ?> ctx = desc.context();
+
+            assert ctx != null;
+
+            if (row instanceof GridH2AbstractKeyValueRow && KEY_FIELD != null) {
+                try {
+                    Object o = KEY_FIELD.get(row);
+
+                    if (o instanceof CacheObject)
+                        key = (CacheObject)o;
+                    else
+                        key = ctx.toCacheKeyObject(o);
+
+                }
+                catch (IllegalAccessException e) {
+                    throw new IllegalStateException(e);
+                }
+            }
+            else
+                key = ctx.toCacheKeyObject(row.getValue(0));
+
+            return segment(ctx.affinity().partition(key));
+        }
+        else
+            return 0;
     }
 
     /**
@@ -463,18 +593,20 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
         IndexColumn[] cols = getIndexColumns();
 
         GridH2TreeIndex idx = new GridH2TreeIndex(getName(), getTable(),
-            getIndexType().isUnique(), F.asList(cols));
+            getIndexType().isUnique(), F.asList(cols), segments.length);
 
         Thread thread = Thread.currentThread();
 
-        long i = 0;
+        long j = 0;
 
-        for (GridH2Row row : tree.values()) {
-            // Check for interruptions every 1000 iterations.
-            if (++i % 1000 == 0 && thread.isInterrupted())
-                throw new InterruptedException();
+        for (int i = 0; i < segments.length; i++) {
+            for (GridH2Row row : segments[i].values()) {
+                // Check for interruptions every 1000 iterations.
+                if ((++j & 1023) == 0 && thread.isInterrupted())
+                    throw new InterruptedException();
 
-            idx.tree.put(row, row);
+                idx.put(row);
+            }
         }
 
         return idx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ac1a6a6..5027c9a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import javax.cache.CacheException;
@@ -56,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -84,6 +86,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
 import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
@@ -161,8 +165,7 @@ public class GridMapQueryExecutor {
                 if (nodeRess == null)
                     return;
 
-                for (QueryResults ress : nodeRess.results().values())
-                    ress.cancel(true);
+                nodeRess.cancelAll();
             }
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
@@ -235,12 +238,7 @@ public class GridMapQueryExecutor {
             GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
         }
 
-        QueryResults results = nodeRess.results().remove(qryReqId);
-
-        if (results == null)
-            return;
-
-        results.cancel(true);
+        nodeRess.cancelRequest(qryReqId);
     }
 
     /**
@@ -427,6 +425,7 @@ public class GridMapQueryExecutor {
 
         onQueryRequest0(node,
             req.requestId(),
+            0,
             req.queries(),
             cacheIds,
             req.topologyVersion(),
@@ -434,7 +433,7 @@ public class GridMapQueryExecutor {
             req.partitions(),
             null,
             req.pageSize(),
-            false,
+            OFF,
             req.timeout());
     }
 
@@ -442,12 +441,49 @@ public class GridMapQueryExecutor {
      * @param node Node.
      * @param req Query request.
      */
-    private void onQueryRequest(ClusterNode node, GridH2QueryRequest req) {
-        Map<UUID,int[]> partsMap = req.partitions();
-        int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+    private void onQueryRequest(final ClusterNode node, final GridH2QueryRequest req) throws IgniteCheckedException {
+        final Map<UUID,int[]> partsMap = req.partitions();
+        final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
+
+        assert req.caches() != null && !req.caches().isEmpty();
+
+        GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext( req.caches().get(0));
+
+        if (mainCctx == null)
+            throw new CacheException("Failed to find cache.");
+
+        final DistributedJoinMode joinMode = distributedJoinMode(
+            req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
+            req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
+
+        for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) {
+            final int segment = i;
+
+            ctx.closure().callLocal(
+                new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        onQueryRequest0(node,
+                            req.requestId(),
+                            segment,
+                            req.queries(),
+                            req.caches(),
+                            req.topologyVersion(),
+                            partsMap,
+                            parts,
+                            req.tables(),
+                            req.pageSize(),
+                            joinMode,
+                            req.timeout());
+
+                        return null;
+                    }
+                }
+                , QUERY_POOL);
+        }
 
         onQueryRequest0(node,
             req.requestId(),
+            0,
             req.queries(),
             req.caches(),
             req.topologyVersion(),
@@ -455,13 +491,14 @@ public class GridMapQueryExecutor {
             parts,
             req.tables(),
             req.pageSize(),
-            req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS),
+            joinMode,
             req.timeout());
     }
 
     /**
      * @param node Node authored request.
      * @param reqId Request ID.
+     * @param segmentId index segment ID.
      * @param qrys Queries to execute.
      * @param cacheIds Caches which will be affected by these queries.
      * @param topVer Topology version.
@@ -469,11 +506,12 @@ public class GridMapQueryExecutor {
      * @param parts Explicit partitions for current node.
      * @param tbls Tables.
      * @param pageSize Page size.
-     * @param distributedJoins Can we expect distributed joins to be ran.
+     * @param distributedJoinMode Query distributed join mode.
      */
     private void onQueryRequest0(
         ClusterNode node,
         long reqId,
+        int segmentId,
         Collection<GridCacheSqlQuery> qrys,
         List<Integer> cacheIds,
         AffinityTopologyVersion topVer,
@@ -481,7 +519,7 @@ public class GridMapQueryExecutor {
         int[] parts,
         Collection<String> tbls,
         int pageSize,
-        boolean distributedJoins,
+        DistributedJoinMode distributedJoinMode,
         int timeout
     ) {
         // Prepare to run queries.
@@ -500,7 +538,7 @@ public class GridMapQueryExecutor {
             if (topVer != null) {
                 // Reserve primary for topology version or explicit partitions.
                 if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
-                    sendRetry(node, reqId);
+                    sendRetry(node, reqId, segmentId);
 
                     return;
                 }
@@ -508,17 +546,18 @@ public class GridMapQueryExecutor {
 
             qr = new QueryResults(reqId, qrys.size(), mainCctx);
 
-            if (nodeRess.results().put(reqId, qr) != null)
+            if (nodeRess.put(reqId, segmentId, qr) != null)
                 throw new IllegalStateException();
 
             // Prepare query context.
             GridH2QueryContext qctx = new GridH2QueryContext(ctx.localNodeId(),
                 node.id(),
                 reqId,
+                segmentId,
                 mainCctx.isReplicated() ? REPLICATED : MAP)
                 .filter(h2.backupFilter(topVer, parts))
                 .partitionsMap(partsMap)
-                .distributedJoins(distributedJoins)
+                .distributedJoinMode(distributedJoinMode)
                 .pageSize(pageSize)
                 .topologyVersion(topVer)
                 .reservations(reserved);
@@ -542,7 +581,7 @@ public class GridMapQueryExecutor {
             Connection conn = h2.connectionForSpace(mainCctx.name());
 
             // Here we enforce join order to have the same behavior on all the nodes.
-            h2.setupConnection(conn, distributedJoins, true);
+            h2.setupConnection(conn, distributedJoinMode != OFF, true);
 
             GridH2QueryContext.set(qctx);
 
@@ -553,13 +592,13 @@ public class GridMapQueryExecutor {
                 if (nodeRess.cancelled(reqId)) {
                     GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
 
-                    nodeRess.results().remove(reqId);
+                    nodeRess.cancelRequest(reqId);
 
                     throw new QueryCancelledException();
                 }
 
                 // Run queries.
-                int i = 0;
+                int qryIdx = 0;
 
                 boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
 
@@ -567,7 +606,7 @@ public class GridMapQueryExecutor {
                     ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
                         F.asList(qry.parameters()), true,
                         timeout,
-                        qr.cancels[i]);
+                        qr.cancels[qryIdx]);
 
                     if (evt) {
                         ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -587,24 +626,24 @@ public class GridMapQueryExecutor {
 
                     assert rs instanceof JdbcResultSet : rs.getClass();
 
-                    qr.addResult(i, qry, node.id(), rs);
+                    qr.addResult(qryIdx, qry, node.id(), rs);
 
                     if (qr.canceled) {
-                        qr.result(i).close();
+                        qr.result(qryIdx).close();
 
                         throw new QueryCancelledException();
                     }
 
                     // Send the first page.
-                    sendNextPage(nodeRess, node, qr, i, pageSize);
+                    sendNextPage(nodeRess, node, qr, qryIdx, segmentId, pageSize);
 
-                    i++;
+                    qryIdx++;
                 }
             }
             finally {
                 GridH2QueryContext.clearThreadLocal();
 
-                if (!distributedJoins)
+                if (distributedJoinMode == OFF)
                     qctx.clearContext(false);
 
                 if (!F.isEmpty(snapshotedTbls)) {
@@ -615,13 +654,13 @@ public class GridMapQueryExecutor {
         }
         catch (Throwable e) {
             if (qr != null) {
-                nodeRess.results().remove(reqId, qr);
+                nodeRess.remove(reqId, segmentId, qr);
 
                 qr.cancel(false);
             }
 
             if (X.hasCause(e, GridH2RetryException.class))
-                sendRetry(node, reqId);
+                sendRetry(node, reqId, segmentId);
             else {
                 U.error(log, "Failed to execute local query.", e);
 
@@ -681,14 +720,14 @@ public class GridMapQueryExecutor {
             return;
         }
 
-        QueryResults qr = nodeRess.results().get(req.queryRequestId());
+        QueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
 
         if (qr == null)
             sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
         else if (qr.canceled)
             sendError(node, req.queryRequestId(), new QueryCancelledException());
         else
-            sendNextPage(nodeRess, node, qr, req.query(), req.pageSize());
+            sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
     }
 
     /**
@@ -696,9 +735,10 @@ public class GridMapQueryExecutor {
      * @param node Node.
      * @param qr Query results.
      * @param qry Query.
+     * @param segmentId Index segment ID.
      * @param pageSize Page size.
      */
-    private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry,
+    private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry, int segmentId,
         int pageSize) {
         QueryResult res = qr.result(qry);
 
@@ -714,14 +754,14 @@ public class GridMapQueryExecutor {
             res.close();
 
             if (qr.isAllClosed())
-                nodeRess.results().remove(qr.qryReqId, qr);
+                nodeRess.remove(qr.qryReqId, segmentId, qr);
         }
 
         try {
             boolean loc = node.isLocal();
 
-            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page,
-                page == 0 ? res.rowCnt : -1 ,
+            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, segmentId, qry, page,
+                page == 0 ? res.rowCnt : -1,
                 res.cols,
                 loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)),
                 loc ? rows : null);
@@ -741,12 +781,13 @@ public class GridMapQueryExecutor {
     /**
      * @param node Node.
      * @param reqId Request ID.
+     * @param segmentId Index segment ID.
      */
-    private void sendRetry(ClusterNode node, long reqId) {
+    private void sendRetry(ClusterNode node, long reqId, int segmentId) {
         try {
             boolean loc = node.isLocal();
 
-            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
+            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId, segmentId,
             /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
                 loc ? null : Collections.<Message>emptyList(),
                 loc ? Collections.<Value[]>emptyList() : null);
@@ -780,35 +821,118 @@ public class GridMapQueryExecutor {
      */
     private static class NodeResults {
         /** */
-        private final ConcurrentMap<Long, QueryResults> res = new ConcurrentHashMap8<>();
+        private final ConcurrentMap<RequestKey, QueryResults> res = new ConcurrentHashMap8<>();
 
         /** */
         private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
             new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
 
         /**
-         * @return All results.
+         * @param reqId Query Request ID.
+         * @return {@code False} if query was already cancelled.
          */
-        ConcurrentMap<Long, QueryResults> results() {
-            return res;
+        boolean cancelled(long reqId) {
+            return qryHist.get(reqId) != null;
         }
 
         /**
-         * @param qryId Query ID.
-         * @return {@code False} if query was already cancelled.
+         * @param reqId Query Request ID.
+         * @return {@code True} if cancelled.
          */
-        boolean cancelled(long qryId) {
-            return qryHist.get(qryId) != null;
+        boolean onCancel(long reqId) {
+            Boolean old = qryHist.putIfAbsent(reqId, Boolean.FALSE);
+
+            return old == null;
         }
 
         /**
-         * @param qryId Query ID.
-         * @return {@code True} if cancelled.
+         * @param reqId Query Request ID.
+         * @param segmentId Index segment ID.
+         * @return query partial results.
          */
-        boolean onCancel(long qryId) {
-            Boolean old = qryHist.putIfAbsent(qryId, Boolean.FALSE);
+        public QueryResults get(long reqId, int segmentId) {
+            return res.get(new RequestKey(reqId, segmentId));
+        }
 
-            return old == null;
+        /**
+         * Cancel all thread of given request.
+         * @param reqID Request ID.
+         */
+        public void cancelRequest(long reqID) {
+            for (RequestKey key : res.keySet()) {
+                if (key.reqId == reqID) {
+                    QueryResults removed = res.remove(key);
+
+                    if (removed != null)
+                        removed.cancel(true);
+                }
+
+            }
+        }
+
+        /**
+         * @param reqId Query Request ID.
+         * @param segmentId Index segment ID.
+         * @param qr Query Results.
+         * @return {@code True} if removed.
+         */
+        public boolean remove(long reqId, int segmentId, QueryResults qr) {
+            return res.remove(new RequestKey(reqId, segmentId), qr);
+        }
+
+        /**
+         * @param reqId Query Request ID.
+         * @param segmentId Index segment ID.
+         * @param qr Query Results.
+         * @return previous value.
+         */
+        public QueryResults put(long reqId, int segmentId, QueryResults qr) {
+            return res.put(new RequestKey(reqId, segmentId), qr);
+        }
+
+        /**
+         * Cancel all node queries.
+         */
+        public void cancelAll() {
+            for (QueryResults ress : res.values())
+                ress.cancel(true);
+        }
+
+        /**
+         *
+         */
+        private static class RequestKey {
+            /** */
+            private long reqId;
+
+            /** */
+            private int segmentId;
+
+            /** Constructor */
+            RequestKey(long reqId, int segmentId) {
+                this.reqId = reqId;
+                this.segmentId = segmentId;
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean equals(Object o) {
+                if (this == o)
+                    return true;
+                if (o == null || getClass() != o.getClass())
+                    return false;
+
+                RequestKey other = (RequestKey)o;
+
+                return reqId == other.reqId && segmentId == other.segmentId;
+
+            }
+
+            /** {@inheritDoc} */
+            @Override public int hashCode() {
+                int result = (int)(reqId ^ (reqId >>> 32));
+                result = 31 * result + segmentId;
+                return result;
+            }
         }
     }
 
@@ -836,7 +960,8 @@ public class GridMapQueryExecutor {
          * @param qrys Number of queries.
          * @param cctx Cache context.
          */
-        private QueryResults(long qryReqId, int qrys, GridCacheContext<?,?> cctx) {
+        @SuppressWarnings("unchecked")
+        private QueryResults(long qryReqId, int qrys, GridCacheContext<?, ?> cctx) {
             this.qryReqId = qryReqId;
             this.cctx = cctx;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index c267f4a..45d3c58 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -59,7 +59,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     private final AtomicInteger expRowsCnt = new AtomicInteger(0);
 
     /** Remaining rows per source node ID. */
-    private Map<UUID, Counter> remainingRows;
+    private Map<UUID, Counter[]> remainingRows;
 
     /** */
     private final AtomicBoolean lastSubmitted = new AtomicBoolean();
@@ -141,15 +141,22 @@ public abstract class GridMergeIndex extends BaseIndex {
      * Set source nodes.
      *
      * @param nodes Nodes.
+     * @param segmentsCnt Index segments per table.
      */
-    public void setSources(Collection<ClusterNode> nodes) {
+    public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
         assert remainingRows == null;
 
         remainingRows = U.newHashMap(nodes.size());
 
         for (ClusterNode node : nodes) {
-            if (remainingRows.put(node.id(), new Counter()) != null)
+            Counter[] counters = new Counter[segmentsCnt];
+
+            for (int i = 0; i < segmentsCnt; i++)
+                counters[i] = new Counter();
+
+            if (remainingRows.put(node.id(), counters) != null)
                 throw new IllegalStateException("Duplicate node id: " + node.id());
+
         }
     }
 
@@ -194,7 +201,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     public final void addPage(GridResultPage page) {
         int pageRowsCnt = page.rowsInPage();
 
-        Counter cnt = remainingRows.get(page.source());
+        Counter cnt = remainingRows.get(page.source())[page.res.segmentId()];
 
         // RemainingRowsCount should be updated before page adding to avoid race
         // in GridMergeIndexUnsorted cursor iterator
@@ -231,13 +238,14 @@ public abstract class GridMergeIndex extends BaseIndex {
             // Guarantee that finished state possible only if counter is zero and all pages was added
             cnt.state = State.FINISHED;
 
-            for (Counter c : remainingRows.values()) { // Check all the sources.
-                if (c.state != State.FINISHED)
-                    return;
+            for (Counter[] cntrs : remainingRows.values()) { // Check all the sources.
+                for(int i = 0; i < cntrs.length; i++) {
+                    if (cntrs[i].state != State.FINISHED)
+                        return;
+                }
             }
 
             if (lastSubmitted.compareAndSet(false, true)) {
-                // Add page-marker that last page was added
                 addPage0(new GridResultPage(null, page.source(), null) {
                     @Override public boolean isLast() {
                         return true;
@@ -256,7 +264,20 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param page Page.
      */
     protected void fetchNextPage(GridResultPage page) {
-        if (remainingRows.get(page.source()).get() != 0)
+        assert !page.isLast();
+
+        if(page.isFail())
+            page.fetchNextPage(); //rethrow exceptions
+
+        assert page.res != null;
+
+        Counter[] counters = remainingRows.get(page.source());
+
+        int segId = page.res.segmentId();
+
+        Counter counter = counters[segId];
+
+        if (counter.get() != 0)
             page.fetchNextPage();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f54fab6..8837046 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,9 +62,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
@@ -100,6 +100,7 @@ import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
 
 /**
@@ -294,6 +295,7 @@ public class GridReduceQueryExecutor {
     private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) {
         final long qryReqId = msg.queryRequestId();
         final int qry = msg.query();
+        final int seg = msg.segmentId();
 
         final QueryRun r = runs.get(qryReqId);
 
@@ -326,7 +328,7 @@ public class GridReduceQueryExecutor {
                     }
 
                     try {
-                        GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize);
+                        GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, seg, pageSize);
 
                         if (node.isLocal())
                             h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
@@ -514,29 +516,33 @@ public class GridReduceQueryExecutor {
             // Explicit partition mapping for unstable topology.
             Map<ClusterNode, IntArray> partsMap = null;
 
-            if (isPreloadingActive(cctx, extraSpaces)) {
-                if (cctx.isReplicated())
-                    nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
-                else {
-                    partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
+            if (qry.isLocal())
+                nodes = Collections.singleton(ctx.discovery().localNode());
+            else {
+                if (isPreloadingActive(cctx, extraSpaces)) {
+                    if (cctx.isReplicated())
+                        nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
+                    else {
+                        partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
 
-                    nodes = partsMap == null ? null : partsMap.keySet();
+                        nodes = partsMap == null ? null : partsMap.keySet();
+                    }
                 }
-            }
-            else
-                nodes = stableDataNodes(topVer, cctx, extraSpaces);
+                else
+                    nodes = stableDataNodes(topVer, cctx, extraSpaces);
 
-            if (nodes == null)
-                continue; // Retry.
+                if (nodes == null)
+                    continue; // Retry.
 
-            assert !nodes.isEmpty();
+                assert !nodes.isEmpty();
 
-            if (cctx.isReplicated() || qry.explain()) {
-                assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
-                    "We must be on a client node.";
+                if (cctx.isReplicated() || qry.explain()) {
+                    assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
+                        "We must be on a client node.";
 
-                // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
-                nodes = Collections.singleton(F.rand(nodes));
+                    // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
+                    nodes = Collections.singleton(F.rand(nodes));
+                }
             }
 
             final Collection<ClusterNode> finalNodes = nodes;
@@ -545,6 +551,8 @@ public class GridReduceQueryExecutor {
 
             final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
 
+            final int segmentsPerIndex = cctx.config().getQueryParallelism();
+
             for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
                 GridMergeIndex idx;
 
@@ -565,12 +573,12 @@ public class GridReduceQueryExecutor {
                 else
                     idx = GridMergeIndexUnsorted.createDummy(ctx);
 
-                idx.setSources(nodes);
+                idx.setSources(nodes, segmentsPerIndex);
 
                 r.idxs.add(idx);
             }
 
-            r.latch = new CountDownLatch(r.idxs.size() * nodes.size());
+            r.latch = new CountDownLatch(r.idxs.size() * nodes.size() * segmentsPerIndex);
 
             runs.put(qryReqId, r);
 
@@ -626,14 +634,13 @@ public class GridReduceQueryExecutor {
                             .tables(distributedJoins ? qry.tables() : null)
                             .partitions(convert(partsMap))
                             .queries(mapQrys)
-                            .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0)
+                            .flags((qry.isLocal() ? GridH2QueryRequest.FLAG_IS_LOCAL : 0) |
+                                (distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0))
                             .timeout(timeoutMillis),
                     oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
-                    distributedJoins)
-                    ) {
-                    awaitAllReplies(r, nodes);
+                    false)) {
 
-                    cancel.checkCancelled();
+                    awaitAllReplies(r, nodes, cancel);
 
                     Object state = r.state.get();
 
@@ -696,7 +703,7 @@ public class GridReduceQueryExecutor {
                         h2.setupConnection(r.conn, false, enforceJoinOrder);
 
                         GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
-                            .pageSize(r.pageSize).distributedJoins(false));
+                            .pageSize(r.pageSize).distributedJoinMode(OFF));
 
                         try {
                             if (qry.explain())
@@ -817,11 +824,15 @@ public class GridReduceQueryExecutor {
     /**
      * @param r Query run.
      * @param nodes Nodes to check periodically if they alive.
+     * @param cancel Query cancel.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes)
-        throws IgniteInterruptedCheckedException {
+    private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
+        throws IgniteInterruptedCheckedException, QueryCancelledException {
         while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
+
+            cancel.checkCancelled();
+
             for (ClusterNode node : nodes) {
                 if (!ctx.discovery().alive(node)) {
                     handleNodeLeft(r, node.id());

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
index e49c48f..b2548cc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
@@ -38,6 +38,12 @@ public class GridH2IndexRangeRequest implements Message {
     private long qryId;
 
     /** */
+    private int originSegmentId;
+
+    /** */
+    private int segmentId;
+
+    /** */
     private int batchLookupId;
 
     /** */
@@ -87,6 +93,34 @@ public class GridH2IndexRangeRequest implements Message {
     }
 
     /**
+     * @param segmentId Index segment ID.
+     */
+    public void segment(int segmentId) {
+        this.segmentId = segmentId;
+    }
+
+    /**
+     * @return Index segment ID.
+     */
+    public int segment() {
+        return segmentId;
+    }
+
+    /**
+     * @return Origin index segment ID.
+     */
+    public int originSegmentId() {
+        return originSegmentId;
+    }
+
+    /**
+     * @param segmentId Origin index segment ID.
+     */
+    public void originSegmentId(int segmentId) {
+        this.originSegmentId = segmentId;
+    }
+
+    /**
      * @param batchLookupId Batch lookup ID.
      */
     public void batchLookupId(int batchLookupId) {
@@ -136,6 +170,15 @@ public class GridH2IndexRangeRequest implements Message {
 
                 writer.incrementState();
 
+            case 4:
+                if (!writer.writeInt("segmentId", segmentId))
+                    return false;
+
+            case 5:
+                if (!writer.writeInt("originSegId", originSegmentId))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -181,6 +224,21 @@ public class GridH2IndexRangeRequest implements Message {
 
                 reader.incrementState();
 
+            case 4:
+                segmentId = reader.readInt("segmentId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                originSegmentId = reader.readInt("originSegId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridH2IndexRangeRequest.class);
@@ -193,7 +251,7 @@ public class GridH2IndexRangeRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 6;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
index c6414bd..4d3db12 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
@@ -47,6 +47,12 @@ public class GridH2IndexRangeResponse implements Message {
     private long qryId;
 
     /** */
+    private int segmentId;
+
+    /** */
+    private int originSegmentId;
+
+    /** */
     private int batchLookupId;
 
     /** */
@@ -130,6 +136,34 @@ public class GridH2IndexRangeResponse implements Message {
     }
 
     /**
+     * @param segmentId Index segment ID.
+     */
+    public void segment(int segmentId) {
+        this.segmentId = segmentId;
+    }
+
+    /**
+     * @return Index segment ID.
+     */
+    public int segment() {
+        return segmentId;
+    }
+
+    /**
+     * @return  Origin index segment ID.
+     */
+    public int originSegmentId() {
+        return originSegmentId;
+    }
+
+    /**
+     * @param segmentId Origin index segment ID.
+     */
+    public void originSegmentId(int segmentId) {
+        this.originSegmentId = segmentId;
+    }
+
+    /**
      * @param batchLookupId Batch lookup ID.
      */
     public void batchLookupId(int batchLookupId) {
@@ -191,6 +225,17 @@ public class GridH2IndexRangeResponse implements Message {
 
                 writer.incrementState();
 
+            case 6:
+                if (!writer.writeInt("originSegId", originSegmentId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeInt("segmentId", segmentId))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -252,6 +297,21 @@ public class GridH2IndexRangeResponse implements Message {
 
                 reader.incrementState();
 
+            case 6:
+                originSegmentId = reader.readInt("originSegId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                segmentId = reader.readInt("segmentId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridH2IndexRangeResponse.class);
@@ -264,7 +324,7 @@ public class GridH2IndexRangeResponse implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 884173f..ec49aff 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -50,6 +50,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
      */
     public static int FLAG_DISTRIBUTED_JOINS = 1;
 
+    /**
+     * Restrict distributed joins range-requests to local index segments. Range requests to other nodes will not be sent.
+     */
+    public static int FLAG_IS_LOCAL = 2;
+
     /** */
     private long reqId;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
new file mode 100644
index 0000000..f8c9dd5
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests for correct distributed queries with index consisted of many segments.
+ */
+public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static int QRY_PARALLELISM_LVL = 97;
+
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheKeyConfiguration keyCfg = new CacheKeyConfiguration("MyCache", "affKey");
+
+        cfg.setCacheKeyConfiguration(keyCfg);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /**
+     * @param name Cache name.
+     * @param partitioned Partition or replicated cache.
+     * @param idxTypes Indexed types.
+     * @return Cache configuration.
+     */
+    private static <K, V> CacheConfiguration<K, V> cacheConfig(String name, boolean partitioned, Class<?>... idxTypes) {
+        return new CacheConfiguration<K, V>()
+            .setName(name)
+            .setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED)
+            .setQueryParallelism(partitioned ? QRY_PARALLELISM_LVL : 1)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setIndexedTypes(idxTypes);
+    }
+
+    /**
+     * Run tests on single-node grid
+     * @throws Exception If failed.
+     */
+    public void testSingleNodeIndexSegmentation() throws Exception {
+        startGridsMultiThreaded(1, true);
+
+        ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+        ignite(0).createCache(cacheConfig("org", true, Integer.class, Organization.class));
+
+        fillCache();
+
+        checkDistributedQueryWithSegmentedIndex();
+
+        checkLocalQueryWithSegmentedIndex();
+    }
+
+    /**
+     * Run tests on multi-node grid
+     * @throws Exception If failed.
+     */
+    public void testMultiNodeIndexSegmentation() throws Exception {
+        startGridsMultiThreaded(4, true);
+
+        ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+        ignite(0).createCache(cacheConfig("org", true, Integer.class, Organization.class));
+
+        fillCache();
+
+        checkDistributedQueryWithSegmentedIndex();
+
+        checkLocalQueryWithSegmentedIndex();
+    }
+
+    /**
+     * Run tests on multi-node grid
+     * @throws Exception If failed.
+     */
+    public void testMultiNodeSegmentedPartitionedWithReplicated() throws Exception {
+        startGridsMultiThreaded(4, true);
+
+        ignite(0).createCache(cacheConfig("pers", true, Integer.class, Person.class));
+        ignite(0).createCache(cacheConfig("org", false, Integer.class, Organization.class));
+
+        fillCache();
+
+        checkDistributedQueryWithSegmentedIndex();
+
+        checkLocalQueryWithSegmentedIndex();
+    }
+
+    /**
+     * Check distributed joins.
+     * @throws Exception If failed.
+     */
+    public void checkDistributedQueryWithSegmentedIndex() throws Exception {
+        IgniteCache<Integer, Person> c1 = ignite(0).cache("pers");
+
+        int expectedPersons = 0;
+
+        for (Cache.Entry<Integer, Person> e : c1) {
+            final Integer orgId = e.getValue().orgId;
+
+            if (10 <= orgId && orgId < 500)
+                expectedPersons++;
+        }
+
+        String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
+
+        List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll();
+
+        assertEquals(expectedPersons, result.size());
+    }
+
+    /**
+     * Test local query.
+     * @throws Exception If failed.
+     */
+    public void checkLocalQueryWithSegmentedIndex() throws Exception {
+        IgniteCache<Integer, Person> c1 = ignite(0).cache("pers");
+        IgniteCache<Integer, Organization> c2 = ignite(0).cache("org");
+
+        Set<Integer> localOrgIds = new HashSet<>();
+
+        for(Cache.Entry<Integer, Organization> e : c2.localEntries())
+            localOrgIds.add(e.getKey());
+
+        int expectedPersons = 0;
+
+        for (Cache.Entry<Integer, Person> e : c1.localEntries()) {
+            final Integer orgId = e.getValue().orgId;
+
+            if (localOrgIds.contains(orgId))
+                expectedPersons++;
+        }
+
+        String select0 = "select o.name n1, p.name n2 from \"pers\".Person p, \"org\".Organization o where p.orgId = o._key";
+
+        List<List<?>> result = c1.query(new SqlFieldsQuery(select0).setLocal(true)).getAll();
+
+        assertEquals(expectedPersons, result.size());
+    }
+
+    /** */
+    private void fillCache() {
+        IgniteCache<Object, Object> c1 = ignite(0).cache("pers");
+
+        IgniteCache<Object, Object> c2 = ignite(0).cache("org");
+
+        final int orgCount = 500;
+
+        for (int i = 0; i < orgCount; i++)
+            c2.put(i, new Organization("org-" + i));
+
+        final Random random = new Random();
+
+        for (int i = 0; i < 1000; i++) {
+            int orgID = 10 + random.nextInt(orgCount + 10);
+
+            c1.put(i, new Person(orgID, "pers-" + i));
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        Integer orgId;
+
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         *
+         */
+        public Person() {
+            // No-op.
+        }
+
+        /**
+         * @param orgId Organization ID.
+         * @param name Name.
+         */
+        public Person(int orgId, String name) {
+            this.orgId = orgId;
+            this.name = name;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         *
+         */
+        public Organization() {
+            // No-op.
+        }
+
+        /**
+         * @param name Organization name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 06afe7c..4ae2f91 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -33,9 +33,10 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cluster.ClusterNode;
@@ -701,6 +702,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
                 ignite(0).destroyCache(cache.getName());
         }
     }
+
     /**
      * @throws Exception If failed.
      */
@@ -750,6 +752,127 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testIndexSegmentation() throws Exception {
+        CacheConfiguration ccfg1 = cacheConfig("pers", true,
+            Integer.class, Person2.class).setQueryParallelism(4);
+        CacheConfiguration ccfg2 = cacheConfig("org", true,
+            Integer.class, Organization.class).setQueryParallelism(4);
+
+        IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+        IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+        try {
+            c2.put(1, new Organization("o1"));
+            c2.put(2, new Organization("o2"));
+            c1.put(3, new Person2(1, "p1"));
+            c1.put(4, new Person2(2, "p2"));
+            c1.put(5, new Person2(3, "p3"));
+
+            String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1";
+
+            checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0));
+
+            checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0).setLocal(true));
+        }
+        finally {
+            c1.destroy();
+            c2.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicationCacheIndexSegmentationFailure() throws Exception {
+        GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                CacheConfiguration ccfg = cacheConfig("org", false,
+                    Integer.class, Organization.class).setQueryParallelism(4);
+
+                IgniteCache<Object, Object> c = ignite(0).createCache(ccfg);
+
+                return null;
+            }
+        }, CacheException.class, "Cache index segmentation is supported for PARTITIONED mode only.");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIndexSegmentationPartitionedReplicated() throws Exception {
+        CacheConfiguration ccfg1 = cacheConfig("pers", true,
+            Integer.class, Person2.class).setQueryParallelism(4);
+        CacheConfiguration ccfg2 = cacheConfig("org", false,
+            Integer.class, Organization.class);
+
+        final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+        final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+        try {
+            c2.put(1, new Organization("o1"));
+            c2.put(2, new Organization("o2"));
+            c1.put(3, new Person2(1, "p1"));
+            c1.put(4, new Person2(2, "p2"));
+            c1.put(5, new Person2(3, "p3"));
+
+            String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key";
+
+            final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
+
+            qry.setDistributedJoins(true);
+
+            List<List<?>> results = c1.query(qry).getAll();
+
+            assertEquals(2, results.size());
+        }
+        finally {
+            c1.destroy();
+            c2.destroy();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIndexWithDifferentSegmentationLevelsFailure() throws Exception {
+        CacheConfiguration ccfg1 = cacheConfig("pers", true,
+            Integer.class, Person2.class).setQueryParallelism(4);
+        CacheConfiguration ccfg2 = cacheConfig("org", true,
+            Integer.class, Organization.class).setQueryParallelism(3);
+
+        final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
+        final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
+
+        try {
+            c2.put(1, new Organization("o1"));
+            c2.put(2, new Organization("o2"));
+            c1.put(3, new Person2(1, "p1"));
+            c1.put(4, new Person2(2, "p2"));
+            c1.put(5, new Person2(3, "p3"));
+
+            String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1";
+
+            final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
+
+            qry.setDistributedJoins(true);
+
+            GridTestUtils.assertThrows(log, new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    c1.query(qry);
+
+                    return null;
+                }
+            }, CacheException.class, "Using indexes with different parallelism levels in same query is forbidden.");
+        }
+        finally {
+            c1.destroy();
+            c2.destroy();
+        }
+    }
+
+    /**
      * @param cache Cache.
      * @param sql SQL.
      * @param enforceJoinOrder Enforce join order flag.
@@ -789,26 +912,26 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
             false,
             0,
             select +
-                "from " + cache1 + ","  + cache2 + " "+ where);
+                "from " + cache1 + "," + cache2 + " " + where);
 
         checkQueryPlan(cache,
             false,
             0,
             select +
-                "from " + cache2 + ","  + cache1 + " "+ where);
+                "from " + cache2 + "," + cache1 + " " + where);
 
         if (testEnforceJoinOrder) {
             checkQueryPlan(cache,
                 true,
                 0,
                 select +
-                    "from " + cache1 + ","  + cache2 + " "+ where);
+                    "from " + cache1 + "," + cache2 + " " + where);
 
             checkQueryPlan(cache,
                 true,
                 0,
                 select +
-                    "from " + cache2 + ","  + cache1 + " "+ where);
+                    "from " + cache2 + "," + cache1 + " " + where);
         }
     }
 
@@ -823,7 +946,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
         boolean enforceJoinOrder,
         int expBatchedJoins,
         String sql,
-        String...expText) {
+        String... expText) {
         checkQueryPlan(cache,
             enforceJoinOrder,
             expBatchedJoins,
@@ -848,7 +971,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
         boolean enforceJoinOrder,
         int expBatchedJoins,
         SqlFieldsQuery qry,
-        String...expText) {
+        String... expText) {
         qry.setEnforceJoinOrder(enforceJoinOrder);
         qry.setDistributedJoins(true);
 
@@ -984,7 +1107,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
      * @param args Arguments.
      * @return Column as list.
      */
-    private static <X> List<X> columnQuery(IgniteCache<?,?> c, String qry, Object... args) {
+    private static <X> List<X> columnQuery(IgniteCache<?, ?> c, String qry, Object... args) {
         return column(0, c.query(new SqlFieldsQuery(qry).setArgs(args)).getAll());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 52084c7..d6a5fb1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -230,16 +230,16 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertEquals(0, spi.size(typeAB.space(), typeAB));
         assertEquals(0, spi.size(typeBA.space(), typeBA));
 
-        assertFalse(spi.queryLocalSql(typeAA.space(), "select * from A.A", null, Collections.emptySet(), typeAA, null).hasNext());
-        assertFalse(spi.queryLocalSql(typeAB.space(), "select * from A.B", null, Collections.emptySet(), typeAB, null).hasNext());
-        assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A", null, Collections.emptySet(), typeBA, null).hasNext());
+        assertFalse(spi.queryLocalSql(typeAA.space(), "select * from A.A", null, Collections.emptySet(), typeAA.name(), null, null).hasNext());
+        assertFalse(spi.queryLocalSql(typeAB.space(), "select * from A.B", null, Collections.emptySet(), typeAB.name(), null, null).hasNext());
+        assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A", null, Collections.emptySet(), typeBA.name(), null, null).hasNext());
 
         assertFalse(spi.queryLocalSql(typeBA.space(), "select * from B.A, A.B, A.A", null,
-            Collections.emptySet(), typeBA, null).hasNext());
+            Collections.emptySet(), typeBA.name(), null, null).hasNext());
 
         try {
             spi.queryLocalSql(typeBA.space(), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba", null,
-                Collections.emptySet(), typeBA, null).hasNext();
+                Collections.emptySet(), typeBA.name(), null, null).hasNext();
 
             fail("Enumerations of aliases in select block must be prohibited");
         }
@@ -248,10 +248,10 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         }
 
         assertFalse(spi.queryLocalSql(typeAB.space(), "select ab.* from A.B ab", null,
-            Collections.emptySet(), typeAB, null).hasNext());
+            Collections.emptySet(), typeAB.name(), null, null).hasNext());
 
         assertFalse(spi.queryLocalSql(typeBA.space(), "select   ba.*   from B.A  as ba", null,
-            Collections.emptySet(), typeBA, null).hasNext());
+            Collections.emptySet(), typeBA.name(), null, null).hasNext());
 
         // Nothing to remove.
         spi.remove("A", key(1), aa(1, "", 10));
@@ -305,7 +305,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
 
         // Query data.
         Iterator<IgniteBiTuple<Integer, Map<String, Object>>> res =
-            spi.queryLocalSql(typeAA.space(), "from a order by age", null, Collections.emptySet(), typeAA, null);
+            spi.queryLocalSql(typeAA.space(), "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(aa(3, "Borya", 18).value(null, false), value(res.next()));
@@ -314,7 +314,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertFalse(res.hasNext());
 
         res = spi.queryLocalSql(typeAA.space(), "select aa.* from a aa order by aa.age", null,
-            Collections.emptySet(), typeAA, null);
+            Collections.emptySet(), typeAA.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(aa(3, "Borya", 18).value(null, false), value(res.next()));
@@ -322,7 +322,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertEquals(aa(2, "Valera", 19).value(null, false), value(res.next()));
         assertFalse(res.hasNext());
 
-        res = spi.queryLocalSql(typeAB.space(), "from b order by name", null, Collections.emptySet(), typeAB, null);
+        res = spi.queryLocalSql(typeAB.space(), "from b order by name", null, Collections.emptySet(), typeAB.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").value(null, false), value(res.next()));
@@ -331,7 +331,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertFalse(res.hasNext());
 
         res = spi.queryLocalSql(typeAB.space(), "select bb.* from b as bb order by bb.name", null,
-            Collections.emptySet(), typeAB, null);
+            Collections.emptySet(), typeAB.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").value(null, false), value(res.next()));
@@ -340,7 +340,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertFalse(res.hasNext());
 
 
-        res = spi.queryLocalSql(typeBA.space(), "from a", null, Collections.emptySet(), typeBA, null);
+        res = spi.queryLocalSql(typeBA.space(), "from a", null, Collections.emptySet(), typeBA.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(ba(2, "Kolya", 25, true).value(null, false), value(res.next()));
@@ -725,4 +725,4 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
             throw new UnsupportedOperationException();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java b/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
new file mode 100644
index 0000000..d1d80f2
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/loadtests/h2indexing/FetchingQueryCursorStressTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.loadtests.h2indexing;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+
+/**
+ * SQL query stress test.
+ */
+public class FetchingQueryCursorStressTest {
+    /** Node count. */
+    private static final int NODE_CNT = 4; // Switch to 4 to see better throughput.
+
+    /** Number of entries. */
+    private static final int ENTRIES_CNT = 10_000;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** Thread count. */
+    private static final int THREAD_CNT = 16;
+
+    /** Execution counter. */
+    private static final AtomicLong CNT = new AtomicLong();
+
+    /** Verbose mode. */
+    private static final boolean VERBOSE = false;
+
+    /** */
+    private static final long TIMEOUT = TimeUnit.SECONDS.toMillis(30);
+
+    public static final AtomicReference<Exception> error = new AtomicReference<>();
+
+    /**
+     * Entry point.
+     */
+    public static void main(String[] args) throws Exception {
+        List<Thread> threads = new ArrayList<>(THREAD_CNT + 1);
+
+        try (Ignite ignite = start()) {
+
+            IgniteCache<Integer, Person> cache = ignite.cache(CACHE_NAME);
+
+            loadData(ignite, cache);
+
+            System.out.println("Loaded data: " + cache.size());
+
+            for (int i = 0; i < THREAD_CNT; i++)
+                threads.add(startDaemon("qry-exec-" + i, new QueryExecutor(cache, "Select * from Person")));
+
+            threads.add(startDaemon("printer", new ThroughputPrinter()));
+
+            Thread.sleep(TIMEOUT);
+
+            for (Thread t : threads)
+                t.join();
+
+            if(error.get()!=null)
+                throw error.get();
+        }
+        finally {
+            Ignition.stopAll(false);
+        }
+    }
+
+    /**
+     * Start daemon thread.
+     *
+     * @param name Name.
+     * @param r Runnable.
+     */
+    private static Thread startDaemon(String name, Runnable r) {
+        Thread t = new Thread(r);
+
+        t.setName(name);
+        t.setDaemon(true);
+
+        t.start();
+
+        return t;
+    }
+
+    /**
+     * Load data into Ignite.
+     *
+     * @param ignite Ignite.
+     * @param cache Cache.
+     */
+    private static void loadData(Ignite ignite, IgniteCache<Integer, Person> cache) throws Exception {
+        try (IgniteDataStreamer<Object, Object> str = ignite.dataStreamer(cache.getName())) {
+
+            for (int id = 0; id < ENTRIES_CNT; id++)
+                str.addData(id, new Person(id, "John" + id, "Doe"));
+        }
+    }
+
+    /**
+     * Start topology.
+     *
+     * @return Client node.
+     */
+    private static Ignite start() {
+        int i = 0;
+
+        for (; i < NODE_CNT; i++)
+            Ignition.start(config(i, false));
+
+        return Ignition.start(config(i, true));
+    }
+
+    /**
+     * Create configuration.
+     *
+     * @param idx Index.
+     * @param client Client flag.
+     * @return Configuration.
+     */
+    @SuppressWarnings("unchecked")
+    private static IgniteConfiguration config(int idx, boolean client) {
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName("grid-" + idx);
+        cfg.setClientMode(client);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setIndexedTypes(Integer.class, Person.class);
+        cfg.setMarshaller(new OptimizedMarshaller());
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setLocalHost("127.0.0.1");
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @QuerySqlField
+        private int id;
+
+        /** */
+        @QuerySqlField
+        private String firstName;
+
+        /** */
+        @QuerySqlField
+        private String lastName;
+
+        public Person(int id, String firstName, String lastName) {
+            this.id = id;
+            this.firstName = firstName;
+            this.lastName = lastName;
+        }
+    }
+
+    /**
+     * Query runner.
+     */
+    private static class QueryExecutor implements Runnable {
+        /** Cache. */
+        private final IgniteCache<Integer, Person> cache;
+
+        /** */
+        private final String query;
+
+        /**
+         * Constructor.
+         *
+         * @param cache Cache.
+         */
+        public QueryExecutor(IgniteCache<Integer, Person> cache, String query) {
+            this.cache = cache;
+            this.query = query;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("InfiniteLoopStatement")
+        @Override public void run() {
+            System.out.println("Executor started: " + Thread.currentThread().getName());
+
+            try {
+                while (error.get()==null && !Thread.currentThread().isInterrupted()) {
+                    long start = System.nanoTime();
+
+                    SqlFieldsQuery qry = new SqlFieldsQuery(query);
+
+//                qry.setArgs((Object[]) argumentForQuery());
+
+                    Set<Integer> extIds = new HashSet<>();
+
+                    for (List<?> next : cache.query(qry))
+                        extIds.add((Integer)next.get(0));
+
+                    long dur = (System.nanoTime() - start) / 1_000_000;
+
+                    CNT.incrementAndGet();
+
+                    if (VERBOSE)
+                        System.out.println("[extIds=" + extIds.size() + ", dur=" + dur + ']');
+                }
+            }
+            catch (CacheException ex){
+                error.compareAndSet(null, ex);
+            }
+        }
+    }
+
+    /**
+     * Throughput printer.
+     */
+    private static class ThroughputPrinter implements Runnable {
+        /** {@inheritDoc} */
+        @SuppressWarnings("InfiniteLoopStatement")
+        @Override public void run() {
+            while (error.get()==null) {
+                long before = CNT.get();
+                long beforeTime = System.currentTimeMillis();
+
+                try {
+                    Thread.sleep(2000L);
+                }
+                catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    return;
+                }
+
+                long after = CNT.get();
+                long afterTime = System.currentTimeMillis();
+
+                double res = 1000 * ((double)(after - before)) / (afterTime - beforeTime);
+
+                System.out.println((long)res + " ops/sec");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 386b8fd..b417b0a 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -98,6 +98,7 @@ import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTes
 import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
 import org.apache.ignite.internal.processors.query.h2.GridH2IndexRebuildTest;
 import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest;
@@ -134,6 +135,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
 
         // Queries tests.
         suite.addTestSuite(IgniteSqlSplitterSelfTest.class);
+        suite.addTestSuite(IgniteSqlSegmentedIndexSelfTest.class);
         suite.addTestSuite(IgniteSqlSchemaIndexingTest.class);
         suite.addTestSuite(GridCacheQueryIndexDisabledSelfTest.class);
         suite.addTestSuite(IgniteCacheQueryLoadSelfTest.class);


Mime
View raw message