ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [34/50] [abbrv] ignite git commit: ignite-1.9 - SQL related fixes and improvements: - Sorted MERGE index - EXPLAIN fixes - Replicated subqueries fixes
Date Mon, 13 Mar 2017 10:28:08 GMT
ignite-1.9 - SQL related fixes and improvements:
 - Sorted MERGE index
 - EXPLAIN fixes
 - Replicated subqueries fixes

Squashed commit of the following:

commit 423c2155c85ed9be8dffb3517b7331b753e1ce5c
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Thu Mar 9 23:21:38 2017 +0300

    ignite-1.9.1 - test fix

commit ff3c1f2967905b0bcac7661014656d1c080fa803
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Thu Mar 9 11:08:34 2017 +0300

    ignite-1.9.0 - replicated subqueries fix

commit bc0801a3c976f5d87cab2c414f76f69dc28b43d7
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Wed Mar 8 16:03:40 2017 +0300

    ignite-1.9.0 - fix for distributed join test

commit f1f1d96c6babaadab9e3ed1fbb3c9740c94d8209
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Wed Mar 8 15:28:44 2017 +0300

    ignite-1.9.0 - fix for distributed join test

commit a8751d535b3e025a804c441204465e94035a5247
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Tue Feb 28 18:46:07 2017 +0300

    ignite-1.9 - splitter fixes

commit 0601ce6e291eb4689d526e922b02fd9e21df5b08
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Sun Feb 26 23:24:14 2017 +0300

    ignite-1.9 - merge index test

commit 4ad048e248157d799a325b3ce9975d4ad8a9fb49
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Sun Feb 26 23:19:49 2017 +0300

    ignite-1.9 - merge index

commit 4ea63d7335000b8f30bfbd1bb907e411cd62a5e8
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Sun Feb 26 22:44:51 2017 +0300

    ignite-1.9 - unsorted index fixed

commit a639bff6f25a8397e49a892f830c9de23c847127
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Sun Feb 26 20:08:26 2017 +0300

    ignite-1.9 - sorted index fixes2

commit ee9d524f5a0d6f1c416345822e8201c327f1e562
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Fri Feb 24 16:00:26 2017 +0300

    ignite-1.9 - sorted index fixes

commit fc42406a9e55851d53d9dfed8e6cf3c8b12af345
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Date:   Thu Feb 23 16:46:39 2017 +0300

    ignite-1.9 - sorted index


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8817190e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8817190e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8817190e

Branch: refs/heads/ignite-4565-ddl
Commit: 8817190e1dd31d869682df0167bb3e82fb597aad
Parents: 8362fe7
Author: Sergi Vladykin <sergi.vladykin@gmail.com>
Authored: Thu Mar 9 23:30:09 2017 +0300
Committer: Sergi Vladykin <sergi.vladykin@gmail.com>
Committed: Thu Mar 9 23:30:09 2017 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheSqlQuery.java          |  82 ++++-
 .../processors/query/h2/IgniteH2Indexing.java   |   4 +-
 .../query/h2/opt/GridH2CollocationModel.java    |   6 +-
 .../query/h2/opt/GridH2ScanIndex.java           | 273 +++++++++++++++++
 .../processors/query/h2/opt/GridH2Table.java    | 244 +--------------
 .../processors/query/h2/sql/GridSqlQuery.java   |  17 --
 .../query/h2/sql/GridSqlQueryParser.java        |   4 +-
 .../query/h2/sql/GridSqlQuerySplitter.java      |  38 ++-
 .../query/h2/sql/GridSqlSortColumn.java         |  41 +++
 .../query/h2/twostep/GridMapQueryExecutor.java  |  83 +++--
 .../query/h2/twostep/GridMergeIndex.java        | 300 +++++++++++--------
 .../query/h2/twostep/GridMergeIndexSorted.java  | 172 ++++++++---
 .../h2/twostep/GridMergeIndexUnsorted.java      |  67 ++++-
 .../query/h2/twostep/GridMergeTable.java        |  70 ++++-
 .../h2/twostep/GridReduceQueryExecutor.java     | 101 +++++--
 .../query/h2/twostep/GridResultPage.java        |  34 ++-
 .../h2/twostep/msg/GridH2QueryRequest.java      |  11 +-
 .../cache/IgniteCacheAbstractQuerySelfTest.java |  10 +-
 .../query/IgniteSqlSplitterSelfTest.java        | 100 ++++++-
 .../query/h2/sql/H2CompareBigQueryTest.java     |   4 +-
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 .../processors/query/h2/sql/bigQuery.sql        |  34 ++-
 22 files changed, 1138 insertions(+), 559 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index 18688b7..c4bb205 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache.query;
 
 import java.nio.ByteBuffer;
 import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridDirectTransient;
@@ -74,6 +76,19 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
     /** Field kept for backward compatibility. */
     private String alias;
 
+    /** Sort columns. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private transient List<?> sort;
+
+    /** If we have partitioned tables in this query. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private transient boolean partitioned;
+
+    /** Single node to execute the query on. */
+    private UUID node;
+
     /**
      * For {@link Message}.
      */
@@ -218,12 +233,18 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeByteArray("paramsBytes", paramsBytes))
+                if (!writer.writeUuid("node", node))
                     return false;
 
                 writer.incrementState();
 
             case 2:
+                if (!writer.writeByteArray("paramsBytes", paramsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
                 if (!writer.writeString("qry", qry))
                     return false;
 
@@ -251,7 +272,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 1:
-                paramsBytes = reader.readByteArray("paramsBytes");
+                node = reader.readUuid("node");
 
                 if (!reader.isLastRead())
                     return false;
@@ -259,6 +280,14 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 2:
+                paramsBytes = reader.readByteArray("paramsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
                 qry = reader.readString("qry");
 
                 if (!reader.isLastRead())
@@ -278,7 +307,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 3;
+        return 4;
     }
 
     /**
@@ -292,6 +321,8 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
         cp.cols = cols;
         cp.paramIdxs = paramIdxs;
         cp.paramsSize = paramsSize;
+        cp.sort = sort;
+        cp.partitioned = partitioned;
 
         if (F.isEmpty(args))
             cp.params = EMPTY_PARAMS;
@@ -304,4 +335,49 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
 
         return cp;
     }
+
+    /**
+     * @param sort Sort columns.
+     */
+    public void sortColumns(List<?> sort) {
+        this.sort = sort;
+    }
+
+    /**
+     * @return Sort columns.
+     */
+    public List<?> sortColumns() {
+        return sort;
+    }
+
+    /**
+     * @param partitioned If the query contains partitioned tables.
+     */
+    public void partitioned(boolean partitioned) {
+        this.partitioned = partitioned;
+    }
+
+    /**
+     * @return {@code true} If the query contains partitioned tables.
+     */
+    public boolean isPartitioned() {
+        return partitioned;
+    }
+
+    /**
+     * @return Single node to execute the query on or {@code null} if need to execute on all the nodes.
+     */
+    public UUID node() {
+        return node;
+    }
+
+    /**
+     * @param node Single node to execute the query on or {@code null} if need to execute on all the nodes.
+     * @return {@code this}.
+     */
+    public GridCacheSqlQuery node(UUID node) {
+        this.node = node;
+
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index b4bf608..8de8dc4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -85,7 +85,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -155,7 +154,6 @@ import org.h2.result.SortOrder;
 import org.h2.server.web.WebServer;
 import org.h2.table.Column;
 import org.h2.table.IndexColumn;
-import org.h2.table.Table;
 import org.h2.tools.Server;
 import org.h2.util.JdbcUtils;
 import org.h2.value.DataType;
@@ -1453,7 +1451,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     }
 
 
-                    Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt);
+                    Prepared prepared = GridSqlQueryParser.prepared(stmt);
 
                     if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery())
                         throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
index ce11fd5..4df355e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java
@@ -300,10 +300,10 @@ public final class GridH2CollocationModel {
             assert childFilters == null;
 
             // We are at table instance.
-            GridH2Table tbl = (GridH2Table)filter().getTable();
+            Table tbl = filter().getTable();
 
             // Only partitioned tables will do distributed joins.
-            if (!tbl.isPartitioned()) {
+            if (!(tbl instanceof GridH2Table) || !((GridH2Table)tbl).isPartitioned()) {
                 type = Type.REPLICATED;
                 multiplier = MULTIPLIER_COLLOCATED;
 
@@ -593,7 +593,7 @@ public final class GridH2CollocationModel {
     private GridH2CollocationModel child(int i, boolean create) {
         GridH2CollocationModel child = children[i];
 
-        if (child == null && create && isChildTableOrView(i, null)) {
+        if (child == null && create) {
             TableFilter f = childFilters[i];
 
             if (f.getTable().isView()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java
new file mode 100644
index 0000000..3ddd490
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import java.util.ArrayList;
+import org.h2.engine.Database;
+import org.h2.engine.DbObject;
+import org.h2.engine.Session;
+import org.h2.index.BaseIndex;
+import org.h2.index.Cursor;
+import org.h2.index.IndexLookupBatch;
+import org.h2.index.IndexType;
+import org.h2.message.DbException;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+import org.h2.schema.Schema;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.Table;
+import org.h2.table.TableFilter;
+
+/**
+ * Scan index base class.
+ */
+public abstract class GridH2ScanIndex<D extends BaseIndex> extends BaseIndex {
+    /** */
+    private static final IndexType TYPE = IndexType.createScan(false);
+
+    /** */
+    protected final D delegate;
+
+    /**
+     * @param delegate Delegate.
+     */
+    public GridH2ScanIndex(D delegate) {
+        this.delegate = delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getDiskSpaceUsed() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void add(Session ses, Row row) {
+        delegate.add(ses, row);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean canFindNext() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean canGetFirstOrLast() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean canScan() {
+        return delegate.canScan();
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void close(Session ses) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void commit(int operation, Row row) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareRows(SearchRow rowData, SearchRow compare) {
+        return delegate.compareRows(rowData, compare);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) {
+        return find(filter.getSession(), first, last);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
+        return delegate.find(ses, null, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor findFirstOrLast(Session ses, boolean first) {
+        throw DbException.getUnsupportedException("SCAN");
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor findNext(Session ses, SearchRow higherThan, SearchRow last) {
+        throw DbException.throwInternalError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getColumnIndex(Column col) {
+        return -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Column[] getColumns() {
+        return delegate.getColumns();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IndexColumn[] getIndexColumns() {
+        return delegate.getIndexColumns();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IndexType getIndexType() {
+        return TYPE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Row getRow(Session ses, long key) {
+        return delegate.getRow(ses, key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getRowCount(Session ses) {
+        return delegate.getRowCount(ses);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getRowCountApproximation() {
+        return delegate.getRowCountApproximation();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Table getTable() {
+        return delegate.getTable();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isRowIdIndex() {
+        return delegate.isRowIdIndex();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean needRebuild() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(Session ses) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(Session ses, Row row) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setSortedInsertMode(boolean sortedInsertMode) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
+        return delegate.createLookupBatch(filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void truncate(Session ses) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public Schema getSchema() {
+        return delegate.getSchema();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isHidden() {
+        return delegate.isHidden();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkRename() {
+        throw DbException.getUnsupportedException("rename");
+    }
+
+    /** {@inheritDoc} */
+    @Override public ArrayList<DbObject> getChildren() {
+        return delegate.getChildren();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getComment() {
+        return delegate.getComment();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getCreateSQL() {
+        return null; // Scan should return null.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getCreateSQLForCopy(Table tbl, String quotedName) {
+        return delegate.getCreateSQLForCopy(tbl, quotedName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Database getDatabase() {
+        return delegate.getDatabase();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getDropSQL() {
+        return delegate.getDropSQL();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getId() {
+        return delegate.getId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getSQL() {
+        return delegate.getSQL();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getType() {
+        return delegate.getType();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isTemporary() {
+        return delegate.isTemporary();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeChildrenAndResources(Session ses) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rename(String newName) {
+        throw DbException.getUnsupportedException("rename");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setComment(String comment) {
+        throw DbException.getUnsupportedException("comment");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTemporary(boolean temporary) {
+        throw DbException.getUnsupportedException("temporary");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 8d080ae..4d5ea4b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -34,22 +34,14 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.h2.api.TableEngine;
 import org.h2.command.ddl.CreateTableData;
-import org.h2.engine.Database;
-import org.h2.engine.DbObject;
 import org.h2.engine.Session;
-import org.h2.index.BaseIndex;
-import org.h2.index.Cursor;
 import org.h2.index.Index;
-import org.h2.index.IndexLookupBatch;
 import org.h2.index.IndexType;
 import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
 import org.h2.result.SortOrder;
-import org.h2.schema.Schema;
-import org.h2.table.Column;
 import org.h2.table.IndexColumn;
-import org.h2.table.Table;
 import org.h2.table.TableBase;
 import org.h2.table.TableFilter;
 import org.h2.value.Value;
@@ -857,93 +849,15 @@ public class GridH2Table extends TableBase {
      * Wrapper type for primary key.
      */
     @SuppressWarnings("PackageVisibleInnerClass")
-    static class ScanIndex extends BaseIndex {
+    static class ScanIndex extends GridH2ScanIndex<GridH2IndexBase> {
         /** */
         static final String SCAN_INDEX_NAME_SUFFIX = "__SCAN_";
 
-        /** */
-        private static final IndexType TYPE = IndexType.createScan(false);
-
-        /** */
-        private final GridH2IndexBase delegate;
-
         /**
-         * Constructor.
-         *
-         * @param delegate Index delegate to.
+         * @param delegate Delegate.
          */
-        private ScanIndex(GridH2IndexBase delegate) {
-            this.delegate = delegate;
-        }
-
-        /** {@inheritDoc} */
-        @Override public long getDiskSpaceUsed() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void add(Session ses, Row row) {
-            delegate.add(ses, row);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean canFindNext() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean canGetFirstOrLast() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean canScan() {
-            return delegate.canScan();
-        }
-
-        /** {@inheritDoc} */
-        @Override public final void close(Session ses) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void commit(int operation, Row row) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public int compareRows(SearchRow rowData, SearchRow compare) {
-            return delegate.compareRows(rowData, compare);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) {
-            return find(filter.getSession(), first, last);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
-            return delegate.find(ses, null, null);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Cursor findFirstOrLast(Session ses, boolean first) {
-            throw DbException.getUnsupportedException("SCAN");
-        }
-
-        /** {@inheritDoc} */
-        @Override public Cursor findNext(Session ses, SearchRow higherThan, SearchRow last) {
-            throw DbException.throwInternalError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int getColumnIndex(Column col) {
-            return -1;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Column[] getColumns() {
-            return delegate.getColumns();
+        public ScanIndex(GridH2IndexBase delegate) {
+            super(delegate);
         }
 
         /** {@inheritDoc} */
@@ -957,163 +871,13 @@ public class GridH2Table extends TableBase {
         }
 
         /** {@inheritDoc} */
-        @Override public IndexColumn[] getIndexColumns() {
-            return delegate.getIndexColumns();
-        }
-
-        /** {@inheritDoc} */
-        @Override public IndexType getIndexType() {
-            return TYPE;
-        }
-
-        /** {@inheritDoc} */
         @Override public String getPlanSQL() {
             return delegate.getTable().getSQL() + "." + SCAN_INDEX_NAME_SUFFIX;
         }
 
         /** {@inheritDoc} */
-        @Override public Row getRow(Session ses, long key) {
-            return delegate.getRow(ses, key);
-        }
-
-        /** {@inheritDoc} */
-        @Override public long getRowCount(Session ses) {
-            return delegate.getRowCount(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public long getRowCountApproximation() {
-            return delegate.getRowCountApproximation();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Table getTable() {
-            return delegate.getTable();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isRowIdIndex() {
-            return delegate.isRowIdIndex();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean needRebuild() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove(Session ses) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove(Session ses, Row row) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void setSortedInsertMode(boolean sortedInsertMode) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
-            return delegate.createLookupBatch(filter);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void truncate(Session ses) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public Schema getSchema() {
-            return delegate.getSchema();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isHidden() {
-            return delegate.isHidden();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void checkRename() {
-            throw DbException.getUnsupportedException("rename");
-        }
-
-        /** {@inheritDoc} */
-        @Override public ArrayList<DbObject> getChildren() {
-            return delegate.getChildren();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String getComment() {
-            return delegate.getComment();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String getCreateSQL() {
-            return null; // Scan should return null.
-        }
-
-        /** {@inheritDoc} */
-        @Override public String getCreateSQLForCopy(Table tbl, String quotedName) {
-            return delegate.getCreateSQLForCopy(tbl, quotedName);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Database getDatabase() {
-            return delegate.getDatabase();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String getDropSQL() {
-            return delegate.getDropSQL();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int getId() {
-            return delegate.getId();
-        }
-
-        /** {@inheritDoc} */
         @Override public String getName() {
             return delegate.getName() + SCAN_INDEX_NAME_SUFFIX;
         }
-
-        /** {@inheritDoc} */
-        @Override public String getSQL() {
-            return delegate.getSQL();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int getType() {
-            return delegate.getType();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isTemporary() {
-            return delegate.isTemporary();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void removeChildrenAndResources(Session ses) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void rename(String newName) {
-            throw DbException.getUnsupportedException("rename");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void setComment(String comment) {
-            throw DbException.getUnsupportedException("comment");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void setTemporary(boolean temporary) {
-            throw DbException.getUnsupportedException("temporary");
-        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
index 7d4b7f0..9511866 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java
@@ -38,9 +38,6 @@ public abstract class GridSqlQuery extends GridSqlStatement implements GridSqlAs
     /** */
     private GridSqlAst offset;
 
-    /** */
-    private boolean distinct;
-
     /**
      * @return Offset.
      */
@@ -56,20 +53,6 @@ public abstract class GridSqlQuery extends GridSqlStatement implements GridSqlAs
     }
 
     /**
-     * @return Distinct.
-     */
-    public boolean distinct() {
-        return distinct;
-    }
-
-    /**
-     * @param distinct New distinct.
-     */
-    public void distinct(boolean distinct) {
-        this.distinct = distinct;
-    }
-
-    /**
      * @return Sort.
      */
     public List<GridSqlSortColumn> sort() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 024529c..16d7105 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -415,7 +415,7 @@ public class GridSqlQueryParser {
         res.distinct(select.isDistinct());
 
         Expression where = CONDITION.get(select);
-        res.where(parseExpression(where, false));
+        res.where(parseExpression(where, true));
 
         ArrayList<TableFilter> tableFilters = new ArrayList<>();
 
@@ -447,7 +447,7 @@ public class GridSqlQueryParser {
             GridSqlElement gridFilter = parseTableFilter(f);
 
             from = from == null ? gridFilter : new GridSqlJoin(from, gridFilter, f.isJoinOuter(),
-                parseExpression(f.getJoinCondition(), false));
+                parseExpression(f.getJoinCondition(), true));
         }
 
         res.from(from);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 277cabc..aec0b36 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -207,7 +207,6 @@ public class GridSqlQuerySplitter {
         // If we have distributed joins, then we have to optimize all MAP side queries
         // to have a correct join order with respect to batched joins and check if we need
         // distributed joins at all.
-        // TODO Also we need to have a list of table aliases to filter by primary or explicit partitions.
         if (distributedJoins) {
             boolean allCollocated = true;
 
@@ -220,7 +219,7 @@ public class GridSqlQuerySplitter {
                 mapSqlQry.query(parse(prepared, true).getSQL());
             }
 
-            // We do not need distributed joins if all MAP queries are colocated.
+            // We do not need distributed joins if all MAP queries are collocated.
             if (allCollocated)
                 distributedJoins = false;
         }
@@ -861,6 +860,7 @@ public class GridSqlQuerySplitter {
         if (!tblAliases.contains(tblAlias))
             return;
 
+        GridSqlType resType = col.resultType();
         String uniqueColAlias = uniqueColumnAlias(col);
         GridSqlAlias colAlias = cols.get(uniqueColAlias);
 
@@ -874,6 +874,7 @@ public class GridSqlQuerySplitter {
         col = column(uniqueColAlias);
         // col.tableAlias(wrapAlias.alias());
         col.expressionInFrom(wrapAlias);
+        col.resultType(resType);
 
         prnt.child(childIdx, col);
     }
@@ -1066,7 +1067,7 @@ public class GridSqlQuerySplitter {
         else if (qrym.type == Type.UNION) {
             // If it is not a UNION ALL, then we have to split because otherwise we can produce duplicates or
             // wrong results for UNION DISTINCT, EXCEPT, INTERSECT queries.
-            if (!qrym.needSplitChild && !qrym.unionAll)
+            if (!qrym.needSplitChild && (!qrym.unionAll || hasOffsetLimit(qrym.<GridSqlUnion>ast())))
                 qrym.needSplitChild = true;
 
             // If we have to split some child SELECT in this UNION, then we have to enforce split
@@ -1151,6 +1152,14 @@ public class GridSqlQuerySplitter {
     }
 
     /**
+     * @param qry Query.
+     * @return {@code true} If we have OFFSET LIMIT.
+     */
+    private static boolean hasOffsetLimit(GridSqlQuery qry) {
+        return qry.limit() != null || qry.offset() != null;
+    }
+
+    /**
      * @param select Select to check.
      * @return {@code true} If we need to split this select.
      */
@@ -1158,6 +1167,9 @@ public class GridSqlQuerySplitter {
         if (select.distinct())
             return true;
 
+        if (hasOffsetLimit(select))
+            return true;
+
         if (collocatedGrpBy)
             return false;
 
@@ -1304,11 +1316,29 @@ public class GridSqlQuerySplitter {
 
         setupParameters(map, mapQry, params);
         map.columns(collectColumns(mapExps));
+        map.sortColumns(mapQry.sort());
+        map.partitioned(hasPartitionedTables(mapQry));
 
         mapSqlQrys.add(map);
     }
 
     /**
+     * @param ast Map query AST.
+     * @return {@code true} If the given AST has partitioned tables.
+     */
+    private static boolean hasPartitionedTables(GridSqlAst ast) {
+        if (ast instanceof GridSqlTable)
+            return ((GridSqlTable)ast).dataTable().isPartitioned();
+
+        for (int i = 0; i < ast.size(); i++) {
+            if (hasPartitionedTables(ast.child(i)))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param sqlQry Query.
      * @param qryAst Select AST.
      * @param params All parameters.
@@ -1333,7 +1363,7 @@ public class GridSqlQuerySplitter {
             GridSqlType t = col.resultType();
 
             if (t == null)
-                throw new NullPointerException("Column type.");
+                throw new NullPointerException("Column type: " + col);
 
             if (t == GridSqlType.UNKNOWN)
                 throw new IllegalStateException("Unknown type: " + col);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
index 8e8947f..d870ac5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java
@@ -17,6 +17,13 @@
 
 package org.apache.ignite.internal.processors.query.h2.sql;
 
+import java.util.List;
+import org.apache.ignite.internal.util.typedef.F;
+import org.h2.result.SortOrder;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.Table;
+
 /**
  * Sort order for ORDER BY clause.
  */
@@ -47,6 +54,40 @@ public class GridSqlSortColumn {
     }
 
     /**
+     * @param tbl Table.
+     * @param sortCols Sort columns.
+     * @return Index columns.
+     */
+    public static IndexColumn[] toIndexColumns(Table tbl, List<GridSqlSortColumn> sortCols) {
+        assert !F.isEmpty(sortCols);
+
+        IndexColumn[] res = new IndexColumn[sortCols.size()];
+
+        for (int i = 0; i < res.length; i++) {
+            GridSqlSortColumn sc = sortCols.get(i);
+
+            Column col = tbl.getColumn(sc.column());
+
+            IndexColumn c = new IndexColumn();
+
+            c.column = col;
+            c.columnName = col.getName();
+
+            c.sortType = sc.asc ? SortOrder.ASCENDING : SortOrder.DESCENDING;
+
+            if (sc.nullsFirst)
+                c.sortType |= SortOrder.NULLS_FIRST;
+
+            if (sc.nullsLast)
+                c.sortType |= SortOrder.NULLS_LAST;
+
+            res[i] = c;
+        }
+
+        return res;
+    }
+
+    /**
      * @return Column index.
      */
     public int column() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index f002a5e..6416b21 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -459,8 +459,11 @@ public class GridMapQueryExecutor {
             req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS));
 
         final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
+        final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
 
-        for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) {
+        int segments = explain ? 1 : mainCctx.config().getQueryParallelism();
+
+        for (int i = 1; i < segments; i++) {
             final int segment = i;
 
             ctx.closure().callLocal(
@@ -587,7 +590,6 @@ public class GridMapQueryExecutor {
 
             Connection conn = h2.connectionForSpace(mainCctx.name());
 
-            // Here we enforce join order to have the same behavior on all the nodes.
             setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);
 
             GridH2QueryContext.set(qctx);
@@ -610,28 +612,34 @@ public class GridMapQueryExecutor {
                 boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
 
                 for (GridCacheSqlQuery qry : qrys) {
-                    ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
-                        F.asList(qry.parameters()), true,
-                        timeout,
-                        qr.cancels[qryIdx]);
-
-                    if (evt) {
-                        ctx.event().record(new CacheQueryExecutedEvent<>(
-                            node,
-                            "SQL query executed.",
-                            EVT_CACHE_QUERY_EXECUTED,
-                            CacheQueryType.SQL.name(),
-                            mainCctx.namex(),
-                            null,
-                            qry.query(),
-                            null,
-                            null,
-                            qry.parameters(),
-                            node.id(),
-                            null));
-                    }
+                    ResultSet rs = null;
+
+                    // If we are not the target node for this replicated query, just ignore it.
+                    if (qry.node() == null ||
+                        (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
+                        rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
+                            F.asList(qry.parameters()), true,
+                            timeout,
+                            qr.cancels[qryIdx]);
+
+                        if (evt) {
+                            ctx.event().record(new CacheQueryExecutedEvent<>(
+                                node,
+                                "SQL query executed.",
+                                EVT_CACHE_QUERY_EXECUTED,
+                                CacheQueryType.SQL.name(),
+                                mainCctx.namex(),
+                                null,
+                                qry.query(),
+                                null,
+                                null,
+                                qry.parameters(),
+                                node.id(),
+                                null));
+                        }
 
-                    assert rs instanceof JdbcResultSet : rs.getClass();
+                        assert rs instanceof JdbcResultSet : rs.getClass();
+                    }
 
                     qr.addResult(qryIdx, qry, node.id(), rs);
 
@@ -751,6 +759,9 @@ public class GridMapQueryExecutor {
 
         assert res != null;
 
+        if (res.closed)
+            return;
+
         int page = res.page;
 
         List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
@@ -1081,21 +1092,31 @@ public class GridMapQueryExecutor {
          * @param qry Query.
          */
         private QueryResult(ResultSet rs, GridCacheContext<?, ?> cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry) {
-            this.rs = rs;
             this.cctx = cctx;
             this.qry = qry;
             this.qrySrcNodeId = qrySrcNodeId;
             this.cpNeeded = cctx.isLocalNode(qrySrcNodeId);
 
-            try {
-                res = (ResultInterface)RESULT_FIELD.get(rs);
-            }
-            catch (IllegalAccessException e) {
-                throw new IllegalStateException(e); // Must not happen.
+            if (rs != null) {
+                this.rs = rs;
+                try {
+                    res = (ResultInterface)RESULT_FIELD.get(rs);
+                }
+                catch (IllegalAccessException e) {
+                    throw new IllegalStateException(e); // Must not happen.
+                }
+
+                rowCnt = res.getRowCount();
+                cols = res.getVisibleColumnCount();
             }
+            else {
+                this.rs = null;
+                this.res = null;
+                this.cols = -1;
+                this.rowCnt = -1;
 
-            rowCnt = res.getRowCount();
-            cols = res.getVisibleColumnCount();
+                closed = true;
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 6a6e045..27622bb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -22,20 +22,22 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.RandomAccess;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
@@ -44,13 +46,10 @@ import org.h2.index.IndexType;
 import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
-import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
-import org.h2.table.TableFilter;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
-import static java.util.Collections.emptyIterator;
 import static java.util.Objects.requireNonNull;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE;
@@ -67,6 +66,22 @@ public abstract class GridMergeIndex extends BaseIndex {
     private static final int PREFETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024);
 
     /** */
+    private static final AtomicReferenceFieldUpdater<GridMergeIndex, ConcurrentMap> lastPagesUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(GridMergeIndex.class, ConcurrentMap.class, "lastPages");
+
+    static {
+        if (!U.isPow2(PREFETCH_SIZE)) {
+            throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
+                ") must be positive and a power of 2.");
+        }
+
+        if (PREFETCH_SIZE >= MAX_FETCH_SIZE) {
+            throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
+                ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ").");
+        }
+    }
+
+    /** */
     protected final Comparator<SearchRow> firstRowCmp = new Comparator<SearchRow>() {
         @Override public int compare(SearchRow rowInList, SearchRow searchRow) {
             int res = compareRows(rowInList, searchRow);
@@ -84,14 +99,11 @@ public abstract class GridMergeIndex extends BaseIndex {
         }
     };
 
-    /** All rows number. */
-    private final AtomicInteger expRowsCnt = new AtomicInteger(0);
-
-    /** Remaining rows per source node ID. */
-    private Map<UUID, Counter[]> remainingRows;
+    /** Row source nodes. */
+    private Set<UUID> sources;
 
     /** */
-    private final AtomicBoolean lastSubmitted = new AtomicBoolean();
+    private int pageSize;
 
     /**
      * Will be r/w from query execution thread only, does not need to be threadsafe.
@@ -107,6 +119,9 @@ public abstract class GridMergeIndex extends BaseIndex {
     /** */
     private final GridKernalContext ctx;
 
+    /** */
+    private volatile ConcurrentMap<SourceKey, Integer> lastPages;
+
     /**
      * @param ctx Context.
      * @param tbl Table.
@@ -129,16 +144,6 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param ctx Context.
      */
     protected GridMergeIndex(GridKernalContext ctx) {
-        if (!U.isPow2(PREFETCH_SIZE)) {
-            throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
-                ") must be positive and a power of 2.");
-        }
-
-        if (PREFETCH_SIZE >= MAX_FETCH_SIZE) {
-            throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
-                ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ").");
-        }
-
         this.ctx = ctx;
 
         fetched = new BlockList<>(PREFETCH_SIZE);
@@ -148,7 +153,7 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @return Return source nodes for this merge index.
      */
     public Set<UUID> sources() {
-        return remainingRows.keySet();
+        return sources;
     }
 
     /**
@@ -169,17 +174,24 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @return {@code true} If this index needs data from the given source node.
      */
     public boolean hasSource(UUID nodeId) {
-        return remainingRows.containsKey(nodeId);
+        return sources.contains(nodeId);
     }
 
     /** {@inheritDoc} */
     @Override public long getRowCount(Session ses) {
-        return expRowsCnt.get();
+        Cursor c = find(ses, null, null);
+
+        long cnt = 0;
+
+        while (c.next())
+            cnt++;
+
+        return cnt;
     }
 
     /** {@inheritDoc} */
     @Override public long getRowCountApproximation() {
-        return getRowCount(null);
+        return 10_000;
     }
 
     /**
@@ -189,27 +201,28 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param segmentsCnt Index segments per table.
      */
     public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
-        assert remainingRows == null;
+        assert sources == null;
 
-        remainingRows = U.newHashMap(nodes.size());
+        sources = new HashSet<>();
 
         for (ClusterNode node : nodes) {
-            Counter[] counters = new Counter[segmentsCnt];
-
-            for (int i = 0; i < segmentsCnt; i++)
-                counters[i] = new Counter();
-
-            if (remainingRows.put(node.id(), counters) != null)
-                throw new IllegalStateException("Duplicate node id: " + node.id());
-
+            if (!sources.add(node.id()))
+                throw new IllegalStateException();
         }
     }
 
     /**
+     * @param pageSize Page size.
+     */
+    public void setPageSize(int pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    /**
      * @param queue Queue to poll.
      * @return Next page.
      */
-    private GridResultPage takeNextPage(BlockingQueue<GridResultPage> queue) {
+    private GridResultPage takeNextPage(Pollable<GridResultPage> queue) {
         GridResultPage page;
 
         for (;;) {
@@ -234,16 +247,17 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param iter Current iterator.
      * @return The same or new iterator.
      */
-    protected final Iterator<Value[]> pollNextIterator(BlockingQueue<GridResultPage> queue, Iterator<Value[]> iter) {
-        while (!iter.hasNext()) {
+    protected final Iterator<Value[]> pollNextIterator(Pollable<GridResultPage> queue, Iterator<Value[]> iter) {
+        if (!iter.hasNext()) {
             GridResultPage page = takeNextPage(queue);
 
-            if (page.isLast())
-                return emptyIterator(); // We are done.
-
-            fetchNextPage(page);
+            if (!page.isLast())
+                page.fetchNextPage(); // Failed will throw an exception here.
 
             iter = page.rows();
+
+            // The received iterator must be empty in the dummy last page or on failure.
+            assert iter.hasNext() || page.isDummyLast() || page.isFail();
         }
 
         return iter;
@@ -253,23 +267,18 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param e Error.
      */
     public void fail(final CacheException e) {
-        for (UUID nodeId0 : remainingRows.keySet()) {
-            addPage0(new GridResultPage(null, nodeId0, null) {
-                @Override public boolean isFail() {
-                    return true;
-                }
-
-                @Override public void fetchNextPage() {
-                    throw e;
-                }
-            });
-        }
+        for (UUID nodeId : sources)
+            fail(nodeId, e);
     }
 
     /**
      * @param nodeId Node ID.
+     * @param e Exception.
      */
     public void fail(UUID nodeId, final CacheException e) {
+        if (nodeId == null)
+            nodeId = F.first(sources);
+
         addPage0(new GridResultPage(null, nodeId, null) {
             @Override public boolean isFail() {
                 return true;
@@ -285,91 +294,88 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
-     * @param page Page.
+     * @param nodeId Node ID.
+     * @param res Response.
      */
-    public final void addPage(GridResultPage page) {
-        int pageRowsCnt = page.rowsInPage();
+    private void initLastPages(UUID nodeId, GridQueryNextPageResponse res) {
+        int allRows = res.allRows();
 
-        Counter cnt = remainingRows.get(page.source())[page.res.segmentId()];
+        // If the old protocol we send all rows number in the page 0, other pages have -1.
+        // In the new protocol we do not know it and always have -1, except terminating page,
+        // which has -2. Thus we have to init page counters only when we receive positive value
+        // in the first page.
+        if (allRows < 0 || res.page() != 0)
+            return;
 
-        // RemainingRowsCount should be updated before page adding to avoid race
-        // in GridMergeIndexUnsorted cursor iterator
-        int remainingRowsCount;
+        ConcurrentMap<SourceKey,Integer> lp = lastPages;
 
-        int allRows = page.response().allRows();
+        if (lp == null && !lastPagesUpdater.compareAndSet(this, null, lp = new ConcurrentHashMap<>()))
+            lp = lastPages;
 
-        if (allRows != -1) { // Only the first page contains allRows count and is allowed to init counter.
-            assert cnt.state == State.UNINITIALIZED : "Counter is already initialized.";
+        assert pageSize > 0: pageSize;
 
-            remainingRowsCount = cnt.addAndGet(allRows - pageRowsCnt);
+        int lastPage = allRows == 0 ? 0 : (allRows - 1) / pageSize;
 
-            expRowsCnt.addAndGet(allRows);
+        assert lastPage >= 0: lastPage;
 
-            // Add page before setting initialized flag to avoid race condition with adding last page
-            if (pageRowsCnt > 0)
-                addPage0(page);
+        if (lp.put(new SourceKey(nodeId, res.segmentId()), lastPage) != null)
+            throw new IllegalStateException();
+    }
 
-            // We need this separate flag to handle case when the first source contains only one page
-            // and it will signal that all remaining counters are zero and fetch is finished.
-            cnt.state = State.INITIALIZED;
-        }
-        else {
-            remainingRowsCount = cnt.addAndGet(-pageRowsCnt);
+    /**
+     * @param page Page.
+     */
+    private void markLastPage(GridResultPage page) {
+        GridQueryNextPageResponse res = page.response();
 
-            if (pageRowsCnt > 0)
-                addPage0(page);
-        }
+        if (res.allRows() != -2) { // -2 means the last page.
+            UUID nodeId = page.source();
 
-        if (remainingRowsCount == 0) { // Result can be negative in case of race between messages, it is ok.
-            if (cnt.state == State.UNINITIALIZED)
-                return;
+            initLastPages(nodeId, res);
 
-            // Guarantee that finished state possible only if counter is zero and all pages was added
-            cnt.state = State.FINISHED;
+            ConcurrentMap<SourceKey,Integer> lp = lastPages;
 
-            for (Counter[] cntrs : remainingRows.values()) { // Check all the sources.
-                for(int i = 0; i < cntrs.length; i++) {
-                    if (cntrs[i].state != State.FINISHED)
-                        return;
-                }
-            }
+            if (lp == null)
+                return; // It was not initialized --> wait for -2.
 
-            if (lastSubmitted.compareAndSet(false, true)) {
-                addPage0(new GridResultPage(null, page.source(), null) {
-                    @Override public boolean isLast() {
-                        return true;
-                    }
-                });
+            Integer lastPage = lp.get(new SourceKey(nodeId, res.segmentId()));
+
+            if (lastPage == null)
+                return; // This node may use the new protocol --> wait for -2.
+
+            if (lastPage != res.page()) {
+                assert lastPage > res.page();
+
+                return; // This is not the last page.
             }
         }
+
+        page.setLast(true);
     }
 
     /**
      * @param page Page.
      */
-    protected abstract void addPage0(GridResultPage page);
+    public final void addPage(GridResultPage page) {
+        markLastPage(page);
+        addPage0(page);
+    }
 
     /**
-     * @param page Page.
+     * @param lastPage Real last page.
+     * @return Created dummy page.
      */
-    protected void fetchNextPage(GridResultPage page) {
-        assert !page.isLast();
-
-        if(page.isFail())
-            page.fetchNextPage(); //rethrow exceptions
-
-        assert page.res != null;
-
-        Counter[] counters = remainingRows.get(page.source());
+    protected final GridResultPage createDummyLastPage(GridResultPage lastPage) {
+        assert !lastPage.isDummyLast(); // It must be a real last page.
 
-        int segId = page.res.segmentId();
-
-        Counter counter = counters[segId];
-
-        if (counter.get() != 0)
-            page.fetchNextPage();
+        return new GridResultPage(ctx, lastPage.source(), null).setLast(true);
     }
 
+    /**
+     * @param page Page.
+     */
+    protected abstract void addPage0(GridResultPage page);
+
     /** {@inheritDoc} */
     @Override public final Cursor find(Session ses, SearchRow first, SearchRow last) {
         checkBounds(lastEvictedRow, first, last);
@@ -381,11 +387,9 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
-     * @return {@code true} If we have fetched all the remote rows.
+     * @return {@code true} If we have fetched all the remote rows into a fetched list.
      */
-    public boolean fetchedAll() {
-        return fetchedCnt == expRowsCnt.get();
-    }
+    public abstract boolean fetchedAll();
 
     /**
      * @param lastEvictedRow Last evicted fetched row.
@@ -433,11 +437,6 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
-        return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true);
-    }
-
-    /** {@inheritDoc} */
     @Override public void remove(Session ses) {
         throw DbException.getUnsupportedException("remove index");
     }
@@ -683,14 +682,6 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
-     * Counter with initialization flag.
-     */
-    private static class Counter extends AtomicInteger {
-        /** */
-        volatile State state = State.UNINITIALIZED;
-    }
-
-    /**
      */
     private static final class BlockList<Z> extends AbstractList<Z> implements RandomAccess {
         /** */
@@ -766,4 +757,53 @@ public abstract class GridMergeIndex extends BaseIndex {
             return res;
         }
     }
+
+    /**
+     * Pollable.
+     */
+    protected static interface Pollable<E> {
+        /**
+         * @param timeout Timeout.
+         * @param unit Time unit.
+         * @return Polled value or {@code null} if none.
+         * @throws InterruptedException If interrupted.
+         */
+        E poll(long timeout, TimeUnit unit) throws InterruptedException;
+    }
+
+    /**
+     */
+    private static class SourceKey {
+        final UUID nodeId;
+
+        /** */
+        final int segment;
+
+        /**
+         * @param nodeId Node ID.
+         * @param segment Segment.
+         */
+        SourceKey(UUID nodeId, int segment) {
+            this.nodeId = nodeId;
+            this.segment = segment;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            SourceKey sourceKey = (SourceKey)o;
+
+            if (segment != sourceKey.segment) return false;
+            return nodeId.equals(sourceKey.nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = nodeId.hashCode();
+            result = 31 * result + segment;
+            return result;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
index 32c676d..361bb2d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
@@ -25,18 +25,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.engine.Session;
 import org.h2.index.Cursor;
 import org.h2.index.IndexType;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
+import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
@@ -48,6 +54,9 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase
  */
 public final class GridMergeIndexSorted extends GridMergeIndex {
     /** */
+    private static final IndexType TYPE = IndexType.createNonUnique(false);
+
+    /** */
     private final Comparator<RowStream> streamCmp = new Comparator<RowStream>() {
         @Override public int compare(RowStream o1, RowStream o2) {
             // Nulls at the beginning.
@@ -62,26 +71,33 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
     };
 
     /** */
-    private Map<UUID,RowStream> streamsMap;
+    private Map<UUID,RowStream[]> streamsMap;
 
     /** */
-    private RowStream[] streams;
+    private final Lock lock = new ReentrantLock();
+
+    /** */
+    private final Condition notEmpty = lock.newCondition();
+
+    /** */
+    private GridResultPage failPage;
+
+    /** */
+    private MergeStreamIterator it;
 
     /**
      * @param ctx Kernal context.
      * @param tbl Table.
      * @param name Index name,
-     * @param type Index type.
      * @param cols Columns.
      */
     public GridMergeIndexSorted(
         GridKernalContext ctx,
         GridMergeTable tbl,
         String name,
-        IndexType type,
         IndexColumn[] cols
     ) {
-        super(ctx, tbl, name, type, cols);
+        super(ctx, tbl, name, TYPE, cols);
     }
 
     /** {@inheritDoc} */
@@ -89,33 +105,48 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
         super.setSources(nodes, segmentsCnt);
 
         streamsMap = U.newHashMap(nodes.size());
-        streams = new RowStream[nodes.size()];
+        RowStream[] streams = new RowStream[nodes.size() * segmentsCnt];
 
         int i = 0;
 
         for (ClusterNode node : nodes) {
-            RowStream stream = new RowStream(node.id());
+            RowStream[] segments = new RowStream[segmentsCnt];
 
-            streams[i] = stream;
+            for (int s = 0; s < segmentsCnt; s++)
+                streams[i++] = segments[s] = new RowStream();
 
-            if (streamsMap.put(stream.src, stream) != null)
+            if (streamsMap.put(node.id(), segments) != null)
                 throw new IllegalStateException();
         }
+
+        it = new MergeStreamIterator(streams);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean fetchedAll() {
+        return it.fetchedAll();
     }
 
     /** {@inheritDoc} */
     @Override protected void addPage0(GridResultPage page) {
-        if (page.isLast() || page.isFail()) {
-            // Finish all the streams.
-            for (RowStream stream : streams)
-                stream.addPage(page);
+        if (page.isFail()) {
+            lock.lock();
+
+            try {
+                if (failPage == null) {
+                    failPage = page;
+
+                    notEmpty.signalAll();
+                }
+            }
+            finally {
+                lock.unlock();
+            }
         }
         else {
-            assert page.rowsInPage() > 0;
-
             UUID src = page.source();
 
-            streamsMap.get(src).addPage(page);
+            streamsMap.get(src)[page.segmentId()].addPage(page);
         }
     }
 
@@ -153,8 +184,13 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
     }
 
     /** {@inheritDoc} */
+    @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
+        return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, false);
+    }
+
+    /** {@inheritDoc} */
     @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
-        return new FetchingCursor(first, last, new MergeStreamIterator());
+        return new FetchingCursor(first, last, it);
     }
 
     /**
@@ -165,17 +201,42 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
         private boolean first = true;
 
         /** */
-        private int off;
+        private volatile int off;
 
         /** */
         private boolean hasNext;
 
+        /** */
+        private final RowStream[] streams;
+
+        /**
+         * @param streams Streams.
+         */
+        MergeStreamIterator(RowStream[] streams) {
+            assert !F.isEmpty(streams);
+
+            this.streams = streams;
+        }
+
+        /**
+         * @return {@code true} If fetched all.
+         */
+        private boolean fetchedAll() {
+            return off == streams.length;
+        }
+
         /**
          *
          */
         private void goFirst() {
+            assert first;
+
+            first = false;
+
             for (int i = 0; i < streams.length; i++) {
-                if (!streams[i].next()) {
+                RowStream s = streams[i];
+
+                if (!s.next()) {
                     streams[i] = null;
                     off++; // Move left bound.
                 }
@@ -183,8 +244,6 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
 
             if (off < streams.length)
                 Arrays.sort(streams, streamCmp);
-
-            first = false;
         }
 
         /**
@@ -229,31 +288,68 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
     /**
      * Row stream.
      */
-    private final class RowStream {
-        /** */
-        final UUID src;
-
-        /** */
-        final BlockingQueue<GridResultPage> queue = new ArrayBlockingQueue<>(8);
-
+    private final class RowStream implements Pollable<GridResultPage> {
         /** */
         Iterator<Value[]> iter = emptyIterator();
 
         /** */
         Row cur;
 
-        /**
-         * @param src Source.
-         */
-        private RowStream(UUID src) {
-            this.src = src;
-        }
+        /** */
+        GridResultPage nextPage;
 
         /**
          * @param page Page.
          */
         private void addPage(GridResultPage page) {
-            queue.offer(page);
+            assert !page.isFail();
+
+            if (page.isLast() && page.rowsInPage() == 0)
+                page = createDummyLastPage(page); // Terminate.
+
+            lock.lock();
+
+            try {
+                // We can fetch the next page only when we have polled the previous one.
+                assert nextPage == null;
+
+                nextPage = page;
+
+                notEmpty.signalAll();
+            }
+            finally {
+                lock.unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridResultPage poll(long timeout, TimeUnit unit) throws InterruptedException {
+            long nanos = unit.toNanos(timeout);
+
+            lock.lock();
+
+            try {
+                for (;;) {
+                    if (failPage != null)
+                        return failPage;
+
+                    GridResultPage page = nextPage;
+
+                    if (page != null) {
+                        // isLast && !isDummyLast
+                        nextPage = page.isLast() && page.response() != null
+                            ? createDummyLastPage(page) : null; // Terminate with empty iterator.
+
+                        return page;
+                    }
+
+                    if ((nanos = notEmpty.awaitNanos(nanos)) <= 0)
+                        return null;
+                }
+            }
+            finally {
+                lock.unlock();
+            }
         }
 
         /**
@@ -262,7 +358,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex {
         private boolean next() {
             cur = null;
 
-            iter = pollNextIterator(queue, iter);
+            iter = pollNextIterator(this, iter);
 
             if (!iter.hasNext())
                 return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index b69c898..430a687 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -17,19 +17,24 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
+import org.h2.engine.Session;
 import org.h2.index.Cursor;
 import org.h2.index.IndexType;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
+import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
+import org.h2.table.TableFilter;
 import org.h2.value.Value;
 
 /**
@@ -37,7 +42,16 @@ import org.h2.value.Value;
  */
 public final class GridMergeIndexUnsorted extends GridMergeIndex {
     /** */
-    private final BlockingQueue<GridResultPage> queue = new LinkedBlockingQueue<>();
+    private static final IndexType TYPE = IndexType.createScan(false);
+
+    /** */
+    private final PollableQueue<GridResultPage> queue = new PollableQueue<>();
+
+    /** */
+    private final AtomicInteger activeSources = new AtomicInteger(-1);
+
+    /** */
+    private Iterator<Value[]> iter = Collections.emptyIterator();
 
     /**
      * @param ctx Context.
@@ -45,7 +59,7 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
      * @param name Index name.
      */
     public GridMergeIndexUnsorted(GridKernalContext ctx, GridMergeTable tbl, String name) {
-        super(ctx, tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns()));
+        super(ctx, tbl, name, TYPE, IndexColumn.wrap(tbl.getColumns()));
     }
 
     /**
@@ -64,10 +78,46 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
     }
 
     /** {@inheritDoc} */
+    @Override public void setSources(Collection<ClusterNode> nodes, int segmentsCnt) {
+        super.setSources(nodes, segmentsCnt);
+
+        int x = nodes.size() * segmentsCnt;
+
+        assert x > 0: x;
+
+        activeSources.set(x);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean fetchedAll() {
+        int x = activeSources.get();
+
+        assert x >= 0: x; // This method must not be called if the sources were not set.
+
+        return x == 0 && queue.isEmpty();
+    }
+
+    /** {@inheritDoc} */
     @Override protected void addPage0(GridResultPage page) {
         assert page.rowsInPage() > 0 || page.isLast() || page.isFail();
 
-        queue.add(page);
+        // Do not add empty page to avoid premature stream termination.
+        if (page.rowsInPage() != 0 || page.isFail())
+            queue.add(page);
+
+        if (page.isLast()) {
+            int x = activeSources.decrementAndGet();
+
+            assert x >= 0: x;
+
+            if (x == 0) // Always terminate with empty iterator.
+                queue.add(createDummyLastPage(page));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
+        return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true);
     }
 
     /** {@inheritDoc} */
@@ -80,9 +130,6 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
     @Override protected Cursor findInStream(SearchRow first, SearchRow last) {
         // This index is unsorted: have to ignore bounds.
         return new FetchingCursor(null, null, new Iterator<Row>() {
-            /** */
-            Iterator<Value[]> iter = Collections.emptyIterator();
-
             @Override public boolean hasNext() {
                 iter = pollNextIterator(queue, iter);
 
@@ -98,4 +145,10 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex {
             }
         });
     }
+
+    /**
+     */
+    private static class PollableQueue<X> extends LinkedBlockingQueue<X> implements Pollable<X> {
+        // No-op.
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
index 1489021..f7495c0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
@@ -18,35 +18,55 @@
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import java.util.ArrayList;
-import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2ScanIndex;
+import org.apache.ignite.internal.util.typedef.F;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.Session;
 import org.h2.index.Index;
 import org.h2.index.IndexType;
 import org.h2.message.DbException;
 import org.h2.result.Row;
+import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
 import org.h2.table.TableBase;
+import org.h2.table.TableFilter;
 
 /**
  * Merge table for distributed queries.
  */
 public class GridMergeTable extends TableBase {
     /** */
-    private final GridKernalContext ctx;
-
-    /** */
-    private final GridMergeIndex idx;
+    private ArrayList<Index> idxs;
 
     /**
      * @param data Data.
-     * @param ctx Kernal context.
      */
-    public GridMergeTable(CreateTableData data, GridKernalContext ctx) {
+    public GridMergeTable(CreateTableData data) {
         super(data);
+    }
+
+    /**
+     * @param idxs Indexes.
+     */
+    public void indexes(ArrayList<Index> idxs) {
+        assert !F.isEmpty(idxs);
+
+        this.idxs = idxs;
+    }
 
-        this.ctx = ctx;
-        idx = new GridMergeIndexUnsorted(ctx, this, "merge_scan");
+    /**
+     * @return Merge index.
+     */
+    public GridMergeIndex getMergeIndex() {
+        return (GridMergeIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last.
+    }
+
+    /**
+     * @param idx Index.
+     * @return Scan index.
+     */
+    public static GridH2ScanIndex<GridMergeIndex> createScanIndex(GridMergeIndex idx) {
+        return new ScanIndex(idx);
     }
 
     /** {@inheritDoc} */
@@ -56,7 +76,7 @@ public class GridMergeTable extends TableBase {
 
     /** {@inheritDoc} */
     @Override public void close(Session ses) {
-        idx.close(ses);
+        // No-op.
     }
 
     /** {@inheritDoc} */
@@ -96,8 +116,8 @@ public class GridMergeTable extends TableBase {
     }
 
     /** {@inheritDoc} */
-    @Override public GridMergeIndex getScanIndex(Session session) {
-        return idx;
+    @Override public Index getScanIndex(Session session) {
+        return idxs.get(0); // Must be always at 0.
     }
 
     /** {@inheritDoc} */
@@ -107,7 +127,7 @@ public class GridMergeTable extends TableBase {
 
     /** {@inheritDoc} */
     @Override public ArrayList<Index> getIndexes() {
-        return null;
+        return idxs;
     }
 
     /** {@inheritDoc} */
@@ -137,12 +157,12 @@ public class GridMergeTable extends TableBase {
 
     /** {@inheritDoc} */
     @Override public long getRowCount(Session ses) {
-        return idx.getRowCount(ses);
+        return getScanIndex(ses).getRowCount(ses);
     }
 
     /** {@inheritDoc} */
     @Override public long getRowCountApproximation() {
-        return idx.getRowCountApproximation();
+        return getScanIndex(null).getRowCountApproximation();
     }
 
     /** {@inheritDoc} */
@@ -154,4 +174,24 @@ public class GridMergeTable extends TableBase {
     @Override public void checkRename() {
         throw DbException.getUnsupportedException("rename");
     }
+
+    /**
+     * Scan index wrapper.
+     */
+    private static class ScanIndex extends GridH2ScanIndex<GridMergeIndex> {
+        /**
+         * @param delegate Delegate.
+         */
+        public ScanIndex(GridMergeIndex delegate) {
+            super(delegate);
+        }
+
+        /** {@inheritDoc} */
+        @Override public double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
+            SortOrder sortOrder) {
+            long rows = getRowCountApproximation();
+
+            return getCostRangeIndex(masks, rows, filters, filter, sortOrder, true);
+        }
+    }
 }
\ No newline at end of file


Mime
View raw message