ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [03/48] incubator-ignite git commit: ignite-gg9499 -
Date Tue, 27 Jan 2015 14:25:48 GMT
ignite-gg9499 -


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/193d9b32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/193d9b32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/193d9b32

Branch: refs/heads/sprint-1
Commit: 193d9b32183cf4b35eb0bdb026c37147942bf15e
Parents: 4da1d1a
Author: S.Vladykin <svladykin@gridgain.com>
Authored: Thu Dec 18 19:29:25 2014 +0300
Committer: S.Vladykin <svladykin@gridgain.com>
Committed: Thu Dec 18 19:29:25 2014 +0300

----------------------------------------------------------------------
 .../org/gridgain/grid/kernal/GridTopic.java     |   5 +-
 .../cache/query/GridCacheQueriesEx.java         |   7 +
 .../cache/query/GridCacheQueriesImpl.java       |   5 +
 .../cache/query/GridCacheQueriesProxy.java      |  12 +
 .../cache/query/GridCacheQueryManager.java      |  26 +-
 .../cache/query/GridCacheSqlQuery.java          |  91 +++++
 .../cache/query/GridCacheSqlResult.java         |  19 ++
 .../cache/query/GridCacheTwoStepQuery.java      |  66 ++++
 .../processors/query/GridQueryIndexing.java     |  10 +
 .../processors/query/GridQueryProcessor.java    |  17 +
 .../java/org/gridgain/grid/util/GridQueue.java  | 340 -------------------
 .../gridgain/grid/util/GridQueueSelfTest.java   |  62 ----
 .../processors/query/h2/GridH2Indexing.java     | 130 ++++---
 .../query/h2/GridH2ResultSetIterator.java       |   3 +-
 .../query/h2/opt/GridH2IndexBase.java           |   5 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  | 263 ++++++++++++++
 .../query/h2/twostep/GridMergeIndex.java        | 131 +++++--
 .../h2/twostep/GridMergeIndexUnsorted.java      |  74 ++++
 .../query/h2/twostep/GridMergeTable.java        |  33 +-
 .../query/h2/twostep/GridNextPageRequest.java   |  54 ---
 .../query/h2/twostep/GridNextPageResponse.java  |  47 ---
 .../query/h2/twostep/GridQueryAck.java          |  42 ---
 .../query/h2/twostep/GridQueryRequest.java      |  50 ---
 .../h2/twostep/GridReduceQueryExecutor.java     | 199 +++++++++++
 .../query/h2/twostep/GridResultPage.java        |  76 +++++
 .../twostep/messages/GridNextPageRequest.java   |  59 ++++
 .../twostep/messages/GridNextPageResponse.java  | 149 ++++++++
 .../query/h2/twostep/messages/GridQueryAck.java |  34 ++
 .../twostep/messages/GridQueryFailResponse.java |  46 +++
 .../h2/twostep/messages/GridQueryRequest.java   |  61 ++++
 30 files changed, 1402 insertions(+), 714 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
index 5fedbd9..7ab61d9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridTopic.java
@@ -77,7 +77,10 @@ public enum GridTopic {
     TOPIC_TIME_SYNC,
 
     /** */
-    TOPIC_HADOOP;
+    TOPIC_HADOOP,
+
+    /** */
+    TOPIC_QUERY;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
index d1732fb..e854367 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
@@ -10,6 +10,7 @@
 package org.gridgain.grid.kernal.processors.cache.query;
 
 import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
 import org.gridgain.grid.cache.query.*;
 
 import java.util.*;
@@ -41,4 +42,10 @@ public interface GridCacheQueriesEx<K, V> extends GridCacheQueries<K, V> {
      * @return Query.
      */
     public <R> GridCacheQuery<R> createSpiQuery();
+
+    /**
+     * @param qry Query.
+     * @return Future.
+     */
+    public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
index 3ba1ceb..f643cb2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
@@ -158,6 +158,11 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry) {
+        return ctx.kernalContext().query().queryTwoStep(qry);
+    }
+
+    /** {@inheritDoc} */
     @Override public GridCacheContinuousQuery<K, V> createContinuousQuery() {
         return ctx.continuousQueries().createQuery(prj == null ? null : prj.predicate());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
index 9edcf6a..61f7ac7 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
@@ -166,6 +166,18 @@ public class GridCacheQueriesProxy<K, V> implements GridCacheQueriesEx<K, V>, Ex
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return delegate.execute(qry);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteFuture<?> rebuildIndexes(Class<?> cls) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
index 25c0668..5fbc366 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryManager.java
@@ -51,7 +51,7 @@ import static org.gridgain.grid.kernal.processors.cache.query.GridCacheQueryType
 @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
 public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapter<K, V> {
     /** */
-    protected GridQueryProcessor idxProc;
+    protected GridQueryProcessor qryProc;
 
     /** */
     private String space;
@@ -78,7 +78,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
-        idxProc = cctx.kernalContext().query();
+        qryProc = cctx.kernalContext().query();
         space = cctx.name();
         maxIterCnt = cctx.config().getMaximumQueryIteratorCount();
 
@@ -165,7 +165,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             throw new IllegalStateException("Failed to get size (grid is stopping).");
 
         try {
-            return idxProc.size(space, valType);
+            return qryProc.size(space, valType);
         }
         finally {
             leaveBusy();
@@ -193,7 +193,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             throw new IllegalStateException("Failed to rebuild indexes (grid is stopping).");
 
         try {
-            return idxProc.rebuildIndexes(space, typeName);
+            return qryProc.rebuildIndexes(space, typeName);
         }
         finally {
             leaveBusy();
@@ -210,7 +210,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             throw new IllegalStateException("Failed to rebuild indexes (grid is stopping).");
 
         try {
-            return idxProc.rebuildAllIndexes();
+            return qryProc.rebuildAllIndexes();
         }
         finally {
             leaveBusy();
@@ -262,7 +262,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return; // Ignore index update when node is stopping.
 
         try {
-            idxProc.onSwap(space, key);
+            qryProc.onSwap(space, key);
         }
         finally {
             leaveBusy();
@@ -282,7 +282,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return; // Ignore index update when node is stopping.
 
         try {
-            idxProc.onUnswap(space, key, val, valBytes);
+            qryProc.onUnswap(space, key, val, valBytes);
         }
         finally {
             leaveBusy();
@@ -324,7 +324,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             if (val == null)
                 val = cctx.marshaller().unmarshal(valBytes, cctx.deploy().globalLoader());
 
-            idxProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime);
+            qryProc.store(space, key, keyBytes, val, valBytes, CU.versionToBytes(ver), expirationTime);
         }
         finally {
             invalidateResultCache();
@@ -349,7 +349,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return; // Ignore index update when node is stopping.
 
         try {
-            idxProc.remove(space, key);
+            qryProc.remove(space, key);
         }
         finally {
             invalidateResultCache();
@@ -368,7 +368,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return; // Ignore index update when node is stopping.
 
         try {
-            idxProc.onUndeploy(space, ldr);
+            qryProc.onUndeploy(space, ldr);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -488,7 +488,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             taskName));
                     }
 
-                    iter = idxProc.query(space, qry.clause(), F.asList(args),
+                    iter = qryProc.query(space, qry.clause(), F.asList(args),
                         qry.queryClassName(), filter(qry));
 
                     break;
@@ -531,7 +531,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             taskName));
                     }
 
-                    iter = idxProc.queryText(space, qry.clause(), qry.queryClassName(), filter(qry));
+                    iter = qryProc.queryText(space, qry.clause(), qry.queryClassName(), filter(qry));
 
                     break;
 
@@ -650,7 +650,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             else {
                 assert qry.type() == SQL_FIELDS;
 
-                GridQueryFieldsResult qryRes = idxProc.queryFields(space, qry.clause(), F.asList(args), filter(qry));
+                GridQueryFieldsResult qryRes = qryProc.queryFields(space, qry.clause(), F.asList(args), filter(qry));
 
                 res.metaData(qryRes.metaData());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
new file mode 100644
index 0000000..025ea29
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
@@ -0,0 +1,91 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query;
+
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Query.
+ */
+public class GridCacheSqlQuery implements Externalizable {
+    /** */
+    private static final Object[] EMPTY_PARAMS = {};
+
+    /** */
+    String alias;
+
+    /** */
+    String qry;
+
+    /** */
+    Object[] params;
+
+    /**
+     * For {@link Externalizable}.
+     */
+    public GridCacheSqlQuery() {
+        // No-op.
+    }
+
+    /**
+     * @param alias Alias.
+     * @param qry Query.
+     * @param params Query parameters.
+     */
+    GridCacheSqlQuery(String alias, String qry, Object[] params) {
+        A.ensure(!F.isEmpty(qry), "qry must not be empty");
+
+        this.alias = alias;
+        this.qry = qry;
+
+        this.params = F.isEmpty(params) ? EMPTY_PARAMS : params;
+    }
+
+    /**
+     * @return Alias.
+     */
+    public String alias() {
+        return alias;
+    }
+
+    /**
+     * @return Query.
+     */
+    public String query() {
+        return qry;
+    }
+
+    /**
+     * @return Parameters.
+     */
+    public Object[] parameters() {
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, alias);
+        U.writeString(out, qry);
+        U.writeArray(out, params);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        alias = U.readString(in);
+        qry = U.readString(in);
+        params = U.readArray(in);
+
+        if (F.isEmpty(params))
+            params = EMPTY_PARAMS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java
new file mode 100644
index 0000000..ecee21e
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlResult.java
@@ -0,0 +1,19 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query;
+
+import java.util.*;
+
+/**
+ * SQL Query result.
+ */
+public interface GridCacheSqlResult extends AutoCloseable, Iterable<List<?>> {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
new file mode 100644
index 0000000..a7c9a02
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -0,0 +1,66 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.cache.query;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Two step map-reduce style query.
+ */
+public class GridCacheTwoStepQuery implements Serializable {
+    /** */
+    private Map<String, GridCacheSqlQuery> mapQrys;
+
+    /** */
+    private GridCacheSqlQuery reduce;
+
+    /**
+     * @param qry Reduce query.
+     * @param params Reduce query parameters.
+     */
+    public GridCacheTwoStepQuery(String qry, Object ... params) {
+        reduce = new GridCacheSqlQuery(null, qry, params);
+    }
+
+    /**
+     * @param alias Alias.
+     * @param qry SQL Query.
+     * @param params Query parameters.
+     */
+    public void addMapQuery(String alias, String qry, Object ... params) {
+        A.ensure(!F.isEmpty(alias), "alias must not be empty");
+
+        if (mapQrys == null)
+            mapQrys = new GridLeanMap<>();
+
+        if (mapQrys.put(alias, new GridCacheSqlQuery(alias, qry, params)) != null)
+            throw new IgniteException("Failed to add query, alias already exists: " + alias + ".");
+    }
+
+    /**
+     * @return Reduce query.
+     */
+    public GridCacheSqlQuery reduceQuery() {
+        return reduce;
+    }
+
+    /**
+     * @return Map queries.
+     */
+    public Collection<GridCacheSqlQuery> mapQueries() {
+        return mapQrys.values();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
index 18b2832..1b9ec6a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
@@ -13,6 +13,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.indexing.*;
 import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
 import org.gridgain.grid.util.lang.*;
 import org.jetbrains.annotations.*;
 
@@ -37,6 +38,15 @@ public interface GridQueryIndexing {
      */
     public void stop() throws IgniteCheckedException;
 
+
+    /**
+     * Runs two step query.
+     *
+     * @param qry Query.
+     * @return Future.
+     */
+    public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry);
+
     /**
      * Queries individual fields (generally used by JDBC drivers).
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
index 6bc0235..e05c425 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
@@ -18,6 +18,7 @@ import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
 import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.future.*;
 import org.gridgain.grid.util.lang.*;
@@ -428,6 +429,22 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param qry Query.
+     * @return Future.
+     */
+    public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to execute query (grid is stopping).");
+
+        try {
+            return idx.queryTwoStep(qry);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
      * @param space Space.
      * @param key Key.
      * @throws IgniteCheckedException Thrown in case of any errors.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java b/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java
deleted file mode 100644
index 1f6bbe4..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/util/GridQueue.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.util;
-
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.tostring.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Queue which supports addition at tail and removing at head. This
- * queue also exposes its internal linked list nodes and allows for
- * constant time removal from the middle of the queue.
- * <p>
- * This queue is not thread-safe.
- */
-public class GridQueue<E> extends AbstractCollection<E> implements Queue<E> {
-    /** Queue size. */
-    private int size;
-
-    /** Modification count. */
-    private int modCnt;
-
-    /** Queue header. */
-    private Node<E> hdr = new Node<>(null, null, null);
-
-    /**
-     * Creates empty queue.
-     */
-    public GridQueue() {
-        hdr.next = hdr.prev = hdr;
-    }
-
-    /**
-     * Handles modification count check.
-     *
-     * @param match Modification count to match.
-     */
-    private void checkModCount(int match) {
-        if (modCnt != match)
-            throw new ConcurrentModificationException("Mod count mismatch [expected=" + match +
-                ", actual=" + modCnt + ']');
-
-        modCnt++;
-    }
-
-    /**
-     * Adds element before node.
-     *
-     * @param e Element to add.
-     * @param n Node.
-     * @return New node.
-     */
-    private Node<E> addBefore(E e, Node<E> n) {
-        A.notNull(e, "e");
-
-        assert n != null;
-
-        int match = modCnt;
-
-        Node<E> newNode = new Node<>(e, n, n.prev);
-
-        // Link.
-        newNode.prev.next = newNode;
-        newNode.next.prev = newNode;
-
-        size++;
-
-        checkModCount(match);
-
-        return newNode;
-    }
-
-    /**
-     * Removes node.
-     *
-     * @param n Node to remove.
-     * @return Removed value.
-     */
-    private E remove(Node<E> n) {
-        assert n != null;
-
-        if (n == hdr)
-            throw new NoSuchElementException();
-
-        assert !n.unlinked();
-
-        int match = modCnt;
-
-        E res = n.item;
-
-        // Relink.
-        n.prev.next = n.next;
-        n.next.prev = n.prev;
-
-        // GC.
-        n.next = n.prev = null;
-        n.item = null;
-
-        size--;
-
-        checkModCount(match);
-
-        n.unlink();
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean add(E e) {
-        offer(e);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean remove(Object o) {
-        A.notNull(o, "o");
-
-        for (Node<E> n = hdr.next; n != hdr; n = n.next) {
-            if (o.equals(n.item)) {
-                remove(n);
-
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean offer(E e) {
-        addBefore(e, hdr);
-
-        return true;
-    }
-
-    /**
-     * Same as {@link #offer(Object)}, but returns created node.
-     *
-     * @param e Element to add.
-     * @return New node.
-     */
-    public Node<E> offerx(E e) {
-        return addBefore(e, hdr);
-    }
-
-    /**
-     * Polls element from head of the queue.
-     *
-     * @return Polled element.
-     */
-    @Nullable @Override public E poll() {
-        if (size == 0)
-            return null;
-
-        return remove(hdr.next);
-    }
-
-    /** {@inheritDoc} */
-    @Override public E element() {
-        Node<E> n = hdr.next;
-
-        if (n == null)
-            throw new NoSuchElementException();
-
-        return n.item;
-    }
-
-    /** {@inheritDoc} */
-    @Override public E remove() {
-        E item = poll();
-
-        if (item == null)
-            throw new NoSuchElementException();
-
-        return item;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public E peek() {
-        return hdr.next.item;
-    }
-
-    /**
-     * @return Peeks at first node in the queue.
-     */
-    public Node<E> peekx() {
-        return hdr.next == hdr ? null : hdr.next;
-    }
-
-    /**
-     * Unlinks node from the queue.
-     *
-     * @param n Node to unlink.
-     */
-    public void unlink(Node<E> n) {
-        A.notNull(n, "n");
-
-        remove(n);
-    }
-
-    /**
-     * Gets queue size.
-     *
-     * @return Queue size.
-     */
-    @Override public int size() {
-        return size;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterator<E> iterator() {
-        return new QueueIterator();
-    }
-
-    /**
-     * Node for internal linked list.
-     *
-     * @param <E> Queue element.
-     */
-    @SuppressWarnings( {"PublicInnerClass"})
-    public static class Node<E> {
-        /** Item. */
-        private E item;
-
-        /** Next. */
-        @GridToStringExclude
-        private Node<E> next;
-
-        /** Previous. */
-        @GridToStringExclude
-        private Node<E> prev;
-
-        /** Unlinked flag. */
-        private boolean unlinked;
-
-        /**
-         * @param item Item.
-         * @param next Next link.
-         * @param prev Previous link.
-         */
-        private Node(E item, Node<E> next, Node<E> prev) {
-            this.item = item;
-            this.next = next;
-            this.prev = prev;
-        }
-
-        /**
-         * Gets this node's item.
-         *
-         * @return This node's item.
-         */
-        public E item() {
-            return item;
-        }
-
-        /**
-         * Sets unlinked flag.
-         */
-        void unlink() {
-            assert !unlinked;
-
-            unlinked = true;
-        }
-
-        /**
-         * Checks if node is unlinked.
-         *
-         * @return {@code True} if node is unlinked.
-         */
-        public boolean unlinked() {
-            return unlinked;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Node.class, this);
-        }
-    }
-
-    /**
-     * Iterator.
-     */
-    private class QueueIterator implements Iterator<E> {
-        /** Next element. */
-        private Node<E> next;
-
-        /** Expected modification count. */
-        private int expModCnt = modCnt;
-
-        /**
-         *
-         */
-        QueueIterator() {
-            next = hdr.next;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return next != hdr;
-        }
-
-        /** {@inheritDoc} */
-        @Override public E next() {
-            checkModCount();
-
-            if (next == null)
-                throw new NoSuchElementException();
-
-            E ret = next.item;
-
-            next = next.next;
-
-            return ret;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        /**
-         * Checks modification count.
-         */
-        private void checkModCount() {
-            if (modCnt != expModCnt)
-                throw new ConcurrentModificationException("Mod count mismatch [expected=" + expModCnt +
-                    ", actual=" + modCnt + ']');
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java
deleted file mode 100644
index 4b613f6..0000000
--- a/modules/core/src/test/java/org/gridgain/grid/util/GridQueueSelfTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.util;
-
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.testframework.junits.common.*;
-
-/**
- * Grid utils tests.
- */
-@GridCommonTest(group = "Utils")
-public class GridQueueSelfTest extends GridCommonAbstractTest {
-    /**
-     *
-     */
-    public void testQueue() {
-        GridQueue<String> q = new GridQueue<>();
-        for (char c = 'a'; c <= 'z'; c++)
-            q.offer(Character.toString(c));
-
-        assertEquals('z' - 'a' + 1, q.size());
-
-        char ch = 'a';
-
-        for (String c = q.poll(); c != null; c = q.poll()) {
-            X.println(c);
-
-            assertEquals(Character.toString(ch++), c);
-        }
-
-        assert q.isEmpty();
-
-        for (char c = 'A'; c <= 'Z'; c++)
-            q.offer(Character.toString(c));
-
-        assertEquals('Z' - 'A' + 1, q.size());
-
-        ch = 'A';
-
-        for (String s : q) {
-            X.println(s);
-
-            assertEquals(Character.toString(ch++), s);
-        }
-
-        q.remove("O");
-
-        assertEquals('Z' - 'A', q.size());
-
-        for (String c = q.poll(); c != null; c = q.poll())
-            assert !"O".equals(c);
-
-        assert q.isEmpty();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
index 7307676..7ee84f8 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
@@ -16,13 +16,14 @@ import org.apache.ignite.marshaller.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.indexing.*;
-import org.gridgain.grid.*;
 import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
 import org.gridgain.grid.kernal.processors.query.*;
 import org.gridgain.grid.kernal.processors.query.h2.opt.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.*;
 import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.lang.*;
 import org.gridgain.grid.util.offheap.unsafe.*;
@@ -112,9 +113,6 @@ public class GridH2Indexing implements GridQueryIndexing {
     }
 
     /** */
-    private static final ThreadLocal<GridH2Indexing> localSpi = new ThreadLocal<>();
-
-    /** */
     private volatile String cachedSearchPathCmd;
 
     /** Cache for deserialized offheap rows. */
@@ -148,6 +146,12 @@ public class GridH2Indexing implements GridQueryIndexing {
     private final Collection<Connection> conns = Collections.synchronizedCollection(new ArrayList<Connection>());
 
     /** */
+    private GridMapQueryExecutor mapQryExec;
+
+    /** */
+    private GridReduceQueryExecutor rdcQryExec;
+
+    /** */
     private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
         @Nullable @Override public ConnectionWrapper get() {
             ConnectionWrapper c = super.get();
@@ -218,6 +222,15 @@ public class GridH2Indexing implements GridQueryIndexing {
     private volatile GridKernalContext ctx;
 
     /**
+     * @param space Space.
+     * @return Connection.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Connection connectionForSpace(@Nullable String space) throws IgniteCheckedException {
+        return connectionForThread(schema(space));
+    }
+
+    /**
      * Gets DB connection.
      *
      * @param schema Whether to set schema for connection or not.
@@ -370,22 +383,15 @@ public class GridH2Indexing implements GridQueryIndexing {
         if (tbl == null)
             return; // Type was rejected.
 
-        localSpi.set(this);
+        removeKey(spaceName, k, tbl);
 
-        try {
-            removeKey(spaceName, k, tbl);
-
-            if (expirationTime == 0)
-                expirationTime = Long.MAX_VALUE;
+        if (expirationTime == 0)
+            expirationTime = Long.MAX_VALUE;
 
-            tbl.tbl.update(k, v, expirationTime);
+        tbl.tbl.update(k, v, expirationTime);
 
-            if (tbl.luceneIdx != null)
-                tbl.luceneIdx.store(k, v, ver, expirationTime);
-        }
-        finally {
-            localSpi.remove();
-        }
+        if (tbl.luceneIdx != null)
+            tbl.luceneIdx.store(k, v, ver, expirationTime);
     }
 
     /** {@inheritDoc} */
@@ -393,23 +399,16 @@ public class GridH2Indexing implements GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ']');
 
-        localSpi.set(this);
+        for (TableDescriptor tbl : tables(schema(spaceName))) {
+            if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+                if (tbl.tbl.update(key, null, 0)) {
+                    if (tbl.luceneIdx != null)
+                        tbl.luceneIdx.remove(key);
 
-        try {
-            for (TableDescriptor tbl : tables(schema(spaceName))) {
-                if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
-                    if (tbl.tbl.update(key, null, 0)) {
-                        if (tbl.luceneIdx != null)
-                            tbl.luceneIdx.remove(key);
-
-                        return;
-                    }
+                    return;
                 }
             }
         }
-        finally {
-            localSpi.remove();
-        }
     }
 
     /** {@inheritDoc} */
@@ -419,47 +418,33 @@ public class GridH2Indexing implements GridQueryIndexing {
         if (schema == null)
             return;
 
-        localSpi.set(this);
-
-        try {
-            for (TableDescriptor tbl : schema.values()) {
-                if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
-                    try {
-                        if (tbl.tbl.onSwap(key))
-                            return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteCheckedException(e);
-                    }
+        for (TableDescriptor tbl : schema.values()) {
+            if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+                try {
+                    if (tbl.tbl.onSwap(key))
+                        return;
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteCheckedException(e);
                 }
             }
         }
-        finally {
-            localSpi.remove();
-        }
     }
 
     /** {@inheritDoc} */
     @Override public void onUnswap(@Nullable String spaceName, Object key, Object val, byte[] valBytes)
         throws IgniteCheckedException {
-        localSpi.set(this);
-
-        try {
-            for (TableDescriptor tbl : tables(schema(spaceName))) {
-                if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
-                    try {
-                        if (tbl.tbl.onUnswap(key, val))
-                            return;
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteCheckedException(e);
-                    }
+        for (TableDescriptor tbl : tables(schema(spaceName))) {
+            if (tbl.type().keyClass().equals(key.getClass()) || !isIndexFixedTyping(spaceName)) {
+                try {
+                    if (tbl.tbl.onUnswap(key, val))
+                        return;
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteCheckedException(e);
                 }
             }
         }
-        finally {
-            localSpi.remove();
-        }
     }
 
     /**
@@ -540,8 +525,6 @@ public class GridH2Indexing implements GridQueryIndexing {
     @Override public <K, V> GridQueryFieldsResult queryFields(@Nullable final String spaceName, final String qry,
         @Nullable final Collection<Object> params, final GridIndexingQueryFilter filters)
         throws IgniteCheckedException {
-        localSpi.set(this);
-
         setFilters(filters);
 
         try {
@@ -575,8 +558,6 @@ public class GridH2Indexing implements GridQueryIndexing {
         }
         finally {
             setFilters(null);
-
-            localSpi.remove();
         }
     }
 
@@ -652,7 +633,7 @@ public class GridH2Indexing implements GridQueryIndexing {
      * @return Result.
      * @throws IgniteCheckedException If failed.
      */
-    private ResultSet executeSqlQueryWithTimer(Connection conn, String sql,
+    public ResultSet executeSqlQueryWithTimer(Connection conn, String sql,
         @Nullable Collection<Object> params) throws IgniteCheckedException {
         long start = U.currentTimeMillis();
 
@@ -719,7 +700,7 @@ public class GridH2Indexing implements GridQueryIndexing {
      * @param params Parameters collection.
      * @throws IgniteCheckedException If failed.
      */
-    private void bindParameters(PreparedStatement stmt, @Nullable Collection<Object> params) throws IgniteCheckedException {
+    public void bindParameters(PreparedStatement stmt, @Nullable Collection<Object> params) throws IgniteCheckedException {
         if (!F.isEmpty(params)) {
             int idx = 1;
 
@@ -751,8 +732,6 @@ public class GridH2Indexing implements GridQueryIndexing {
 
         setFilters(filters);
 
-        localSpi.set(this);
-
         try {
             ResultSet rs = executeQuery(qry, params, tbl);
 
@@ -760,11 +739,14 @@ public class GridH2Indexing implements GridQueryIndexing {
         }
         finally {
             setFilters(null);
-
-            localSpi.remove();
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry) {
+        return rdcQryExec.query(qry);
+    }
+
     /**
      * Sets filters for current thread. Must be set to not null value
      * before executeQuery and reset to null after in finally block since it signals
@@ -772,7 +754,7 @@ public class GridH2Indexing implements GridQueryIndexing {
      *
      * @param filters Filters.
      */
-    private void setFilters(@Nullable GridIndexingQueryFilter filters) {
+    public void setFilters(@Nullable GridIndexingQueryFilter filters) {
         GridH2IndexBase.setFiltersForThread(filters);
     }
 
@@ -1115,6 +1097,12 @@ public class GridH2Indexing implements GridQueryIndexing {
 
             for (GridCacheConfiguration cacheCfg : ctx.config().getCacheConfiguration())
                 registerSpace(cacheCfg.getName());
+
+            mapQryExec = new GridMapQueryExecutor();
+            rdcQryExec = new GridReduceQueryExecutor();
+
+            mapQryExec.start(ctx, this);
+            rdcQryExec.start(ctx, this);
         }
 
         System.setProperty("h2.serializeJavaObject", "false");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
index 61f1190..90aa454 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2ResultSetIterator.java
@@ -10,6 +10,7 @@
 package org.gridgain.grid.kernal.processors.query.h2;
 
 import org.apache.ignite.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
 import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.typedef.internal.*;
 
@@ -20,7 +21,7 @@ import java.util.*;
 /**
  * Iterator over result set.
  */
-abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAdapter<T> {
+public abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAdapter<T> {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
index 00cb06d..4dcfd73 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/opt/GridH2IndexBase.java
@@ -49,7 +49,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param fs Filters.
      */
     public static void setFiltersForThread(GridIndexingQueryFilter fs) {
-        filters.set(fs);
+        if (fs == null)
+            filters.remove();
+        else
+            filters.set(fs);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
new file mode 100644
index 0000000..b86caf1
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -0,0 +1,263 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.indexing.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
+import org.gridgain.grid.kernal.processors.query.h2.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*;
+import org.gridgain.grid.util.typedef.*;
+import org.h2.jdbc.*;
+import org.h2.result.*;
+import org.h2.value.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.lang.reflect.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Map query executor.
+ */
+public class GridMapQueryExecutor {
+    /** */
+    private static final Field RESULT_FIELD;
+
+    /**
+     * Initialize.
+     */
+    static {
+        try {
+            RESULT_FIELD = JdbcResultSet.class.getDeclaredField("result");
+
+            RESULT_FIELD.setAccessible(true);
+        }
+        catch (NoSuchFieldException e) {
+            throw new IllegalStateException("Check H2 version in classpath.", e);
+        }
+    }
+
+    /** */
+    private IgniteLogger log;
+
+    /** */
+    private GridKernalContext ctx;
+
+    /** */
+    private GridH2Indexing h2;
+
+    /** */
+    private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = new ConcurrentHashMap8<>();
+
+    /**
+     * @param ctx Context.
+     * @param h2 H2 Indexing.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void start(final GridKernalContext ctx, GridH2Indexing h2) throws IgniteCheckedException {
+        this.ctx = ctx;
+        this.h2 = h2;
+
+        log = ctx.log(GridMapQueryExecutor.class);
+
+        // TODO handle node failures.
+
+        ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() {
+            @Override public boolean apply(UUID nodeId, Object msg) {
+                assert msg != null;
+
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (msg instanceof GridQueryRequest)
+                    executeLocalQuery(node, (GridQueryRequest)msg);
+                else if (msg instanceof GridNextPageRequest)
+                    sendNextPage(node, (GridNextPageRequest)msg);
+
+                return true;
+            }
+        });
+    }
+
+    /**
+     * @param node Node.
+     * @param req Query request.
+     */
+    private void executeLocalQuery(ClusterNode node, GridQueryRequest req) {
+        h2.setFilters(new GridIndexingQueryFilter() {
+            @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) {
+                final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName);
+
+                if (cache.context().isReplicated() || cache.configuration().getBackups() == 0)
+                    return null;
+
+                return new IgniteBiPredicate<K, V>() {
+                    @Override public boolean apply(K k, V v) {
+                        return cache.context().affinity().primary(ctx.discovery().localNode(), k, -1);
+                    }
+                };
+            }
+        });
+
+        try {
+            QueryResults qr = new QueryResults(req.requestId(), req.queries().size());
+
+            ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
+
+            if (nodeRess == null) {
+                nodeRess = new ConcurrentHashMap8<>();
+
+                ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(node.id(), nodeRess);
+
+                if (old != null)
+                    nodeRess = old;
+            }
+
+            QueryResults old = nodeRess.putIfAbsent(req.requestId(), qr);
+
+            assert old == null;
+
+            // Prepare snapshots for all the needed tables before actual run.
+            for (GridCacheSqlQuery qry : req.queries()) {
+                // TODO
+            }
+
+            // Run queries.
+            int i = 0;
+
+            for (GridCacheSqlQuery qry : req.queries()) {
+                ResultSet rs = h2.executeSqlQueryWithTimer(h2.connectionForSpace(null), qry.query(),
+                    F.asList(qry.parameters()));
+
+                assert rs instanceof JdbcResultSet : rs.getClass();
+
+                ResultInterface res = (ResultInterface)RESULT_FIELD.get(rs);
+
+                qr.results[i] = res;
+                qr.resultSets[i] = rs;
+
+                // Send the first page.
+                sendNextPage(node, qr, i, req.pageSize(), res.getRowCount());
+
+                i++;
+            }
+        }
+        catch (Throwable e) {
+            sendError(node, req.requestId(), e);
+        }
+        finally {
+            h2.setFilters(null);
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param qryReqId Query request ID.
+     * @param err Error.
+     */
+    private void sendError(ClusterNode node, long qryReqId, Throwable err) {
+        try {
+            ctx.io().sendUserMessage(F.asList(node), new GridQueryFailResponse(qryReqId, err));
+        }
+        catch (IgniteCheckedException e) {
+            e.addSuppressed(err);
+
+            log.error("Failed to send error message.", e);
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @param req Request.
+     */
+    private void sendNextPage(ClusterNode node, GridNextPageRequest req) {
+        ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
+
+        QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId());
+
+        if (qr == null)
+            sendError(node, req.queryRequestId(),
+                new IllegalStateException("No query result found for request: " + req));
+        else
+            sendNextPage(node, qr, req.query(), req.pageSize(), -1);
+    }
+
+    /**
+     * @param node Node.
+     * @param qr Query results.
+     * @param qry Query.
+     * @param pageSize Page size.
+     * @param allRows All rows count.
+     */
+    private void sendNextPage(ClusterNode node, QueryResults qr, int qry, int pageSize, int allRows) {
+        int page;
+
+        List<Value[]> rows = new ArrayList<>(Math.min(64, pageSize));
+
+        ResultInterface res = qr.results[qry];
+
+        assert res != null;
+
+        synchronized (res) {
+            page = qr.pages[qry]++;
+
+            for (int i = 0 ; i < pageSize; i++) {
+                if (!res.next())
+                    break;
+
+                rows.add(res.currentRow());
+            }
+        }
+
+        try {
+            ctx.io().sendUserMessage(F.asList(node), new GridNextPageResponse(qr.qryReqId, qry, page, allRows, rows));
+        }
+        catch (IgniteCheckedException e) {
+            log.error("Failed to send message.", e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class QueryResults {
+        /** */
+        private long qryReqId;
+
+        /** */
+        private ResultInterface[] results;
+
+        /** */
+        private ResultSet[] resultSets;
+
+        /** */
+        private int[] pages;
+
+        /**
+         * @param qryReqId Query request ID.
+         * @param qrys Queries.
+         */
+        private QueryResults(long qryReqId, int qrys) {
+            this.qryReqId = qryReqId;
+
+            results = new ResultInterface[qrys];
+            resultSets = new ResultSet[qrys];
+            pages = new int[qrys];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
index 163256b..16ba15d 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndex.java
@@ -9,7 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.query.h2.twostep;
 
-import org.gridgain.grid.*;
+import org.apache.ignite.*;
 import org.h2.engine.*;
 import org.h2.index.*;
 import org.h2.message.*;
@@ -18,31 +18,23 @@ import org.h2.table.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 /**
  * Merge index.
  */
-public class GridMergeIndex extends BaseIndex {
+public abstract class GridMergeIndex extends BaseIndex {
     /** */
-    private static final int MAX_CAPACITY = 100_000;
-
-    /** */
-    private static final Collection<Row> END = new ArrayList<>(0);
-
-    /** */
-    private Collection<Collection<Row>> fetchedRows = new LinkedBlockingQueue<>();
-
-    /** */
-    private BlockingQueue<Collection<Row>> cursorRows = new LinkedBlockingQueue<>();
-
-    /** */
-    private int fetchedCnt;
+    private static final int MAX_FETCH_SIZE = 100000;
 
     /** */
     private final AtomicInteger cnt = new AtomicInteger(0);
 
+    /**
+     * Will be r/w from query execution thread only, does not need to be threadsafe.
+     */
+    private ArrayList<Row> fetched = new ArrayList<>();
+
     /** {@inheritDoc} */
     @Override public long getRowCount(Session session) {
         return cnt.get();
@@ -61,12 +53,36 @@ public class GridMergeIndex extends BaseIndex {
     }
 
     /**
-     * @param rows0 Rows.
+     * @param page Page.
      */
-    public void addRows(Collection<Row> rows0) {
-        assert !rows0.isEmpty();
+    public abstract void addPage(GridResultPage<?> page);
+
+    /** {@inheritDoc} */
+    @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
+        if (fetched == null)
+            throw new IgniteException("Fetched result set was too large.");
 
+        if (fetched.size() == cnt.get())  // We've fetched all the rows.
+            return findAllFetched(fetched, first, last);
 
+        return findInStream(first, last);
+    }
+
+    /**
+     * @param first First row.
+     * @param last Last row.
+     * @return Cursor. Usually it must be {@link FetchingCursor} instance.
+     */
+    protected abstract Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last);
+
+    /**
+     * @param fetched Fetched rows.
+     * @param first First row.
+     * @param last Last row.
+     * @return Cursor.
+     */
+    protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) {
+        return new IteratorCursor(fetched.iterator());
     }
 
     /** {@inheritDoc} */
@@ -90,14 +106,6 @@ public class GridMergeIndex extends BaseIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
-        if (fetchedRows == null)
-            throw new GridRuntimeException("Rows were dropped out of result set.");
-
-        return new Cursor0();
-    }
-
-    /** {@inheritDoc} */
     @Override public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
         return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
     }
@@ -132,12 +140,24 @@ public class GridMergeIndex extends BaseIndex {
         return 0;
     }
 
-    private class Cursor0 implements Cursor {
+    /**
+     * Cursor over iterator.
+     */
+    protected class IteratorCursor implements Cursor {
         /** */
-        private Row cur;
+        private Iterator<Row> iter;
 
         /** */
-        private Iterator<Row> curIter;
+        protected Row cur;
+
+        /**
+         * @param iter Iterator.
+         */
+        public IteratorCursor(Iterator<Row> iter) {
+            assert iter != null;
+
+            this.iter = iter;
+        }
 
         /** {@inheritDoc} */
         @Override public Row get() {
@@ -151,7 +171,9 @@ public class GridMergeIndex extends BaseIndex {
 
         /** {@inheritDoc} */
         @Override public boolean next() {
-            return false;
+            cur = iter.hasNext() ? iter.next() : null;
+
+            return cur != null;
         }
 
         /** {@inheritDoc} */
@@ -159,4 +181,51 @@ public class GridMergeIndex extends BaseIndex {
             throw DbException.getUnsupportedException("previous");
         }
     }
+
+    /**
+     * Fetching cursor.
+     */
+    protected abstract class FetchingCursor extends IteratorCursor {
+        /** */
+        private boolean canFetch = true;
+
+        /**
+         */
+        public FetchingCursor() {
+            super(fetched == null ? Collections.<Row>emptyIterator() : fetched.iterator());
+        }
+
+        /**
+         * @return Next row or {@code null} if none available.
+         */
+        @Nullable protected abstract Row fetchNext();
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (super.next())
+                return true;
+
+            if (!canFetch)
+                return false;
+
+            cur = fetchNext();
+
+            if (cur == null) { // No more results to fetch.
+                assert fetched == null || fetched.size() == cnt.get() : fetched.size() + " <> " + cnt.get();
+
+                canFetch = false;
+
+                return false;
+            }
+
+            if (fetched != null) { // Try to reuse fetched result.
+                fetched.add(cur);
+
+                if (fetched.size() == MAX_FETCH_SIZE)
+                    fetched = null; // Throw away fetched result if it is too large.
+            }
+
+            return true;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
new file mode 100644
index 0000000..c6aaea9
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -0,0 +1,74 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.apache.ignite.*;
+import org.h2.index.*;
+import org.h2.result.*;
+import org.h2.value.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Unsorted merge index.
+ */
+public class GridMergeIndexUnsorted extends GridMergeIndex {
+    /** */
+    private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>();
+
+    /** {@inheritDoc} */
+    @Override public void addPage(GridResultPage<?> page) {
+        queue.add(page);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
+        final GridResultPage<?> p = queue.poll();
+
+        assert p != null; // First page must be already fetched.
+
+        if (p.isEmpty())
+            return new IteratorCursor(Collections.<Row>emptyIterator());
+
+        p.fetchNextPage(); // We always request next page before reading this one.
+
+        return new FetchingCursor() {
+            /** */
+            Iterator<Value[]> iter = p.rows().iterator();
+
+            @Nullable @Override protected Row fetchNext() {
+                if (!iter.hasNext()) {
+                    GridResultPage<?> page;
+
+                    try {
+                        page = queue.take();
+                    }
+                    catch (InterruptedException e) {
+                        throw new IgniteException("Query execution was interrupted.", e);
+                    }
+
+                    if (page.isEmpty()) {
+                        assert queue.isEmpty() : "It must be the last page.";
+
+                        return null; // Empty page - we are done.
+                    }
+
+                    page.fetchNextPage();
+
+                    iter = page.rows().iterator();
+                }
+
+                return new Row(iter.next(), 0);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
index 3f059fa..a1f2213 100644
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridMergeTable.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.query.h2.twostep;
 
+import org.h2.api.*;
 import org.h2.command.ddl.*;
 import org.h2.engine.*;
 import org.h2.index.*;
@@ -26,7 +27,7 @@ public class GridMergeTable extends TableBase {
     private final ArrayList<Index> idxs = new ArrayList<>(1);
 
     /** */
-    private final GridMergeIndex idx = new GridMergeIndex();
+    private final GridMergeIndex idx = new GridMergeIndexUnsorted();
 
     /**
      * @param data Data.
@@ -142,4 +143,34 @@ public class GridMergeTable extends TableBase {
     @Override public void checkRename() {
         throw DbException.getUnsupportedException("rename");
     }
+
+    /**
+     * Engine.
+     */
+    public static class Engine implements TableEngine {
+        /** */
+        private static ThreadLocal<GridMergeTable> createdTbl = new ThreadLocal<>();
+
+        /**
+         * @return Created table.
+         */
+        public static GridMergeTable getCreated() {
+            GridMergeTable tbl = createdTbl.get();
+
+            assert tbl != null;
+
+            createdTbl.remove();
+
+            return tbl;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Table createTable(CreateTableData data) {
+            GridMergeTable tbl = new GridMergeTable(data);
+
+            createdTbl.set(tbl);
+
+            return tbl;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
deleted file mode 100644
index d550b3b..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageRequest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-
-import java.nio.*;
-
-/**
- * Request to fetch next page.
- */
-public class GridNextPageRequest extends GridTcpCommunicationMessageAdapter {
-    /** */
-    private long reqId;
-
-    /** */
-    private long qryId;
-
-    /** */
-    private int qry;
-
-    /** */
-    private int offset;
-
-    /** */
-    private int pageSize;
-
-    @Override public boolean writeTo(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public boolean readFrom(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public byte directType() {
-        return 0;
-    }
-
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        return null;
-    }
-
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
deleted file mode 100644
index d77215c..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridNextPageResponse.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-import org.h2.value.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- * TODO write doc
- */
-public class GridNextPageResponse extends GridTcpCommunicationMessageAdapter {
-    /** */
-    private long reqId;
-
-    /** */
-    private Collection<Value[]> rows;
-
-    @Override public boolean writeTo(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public boolean readFrom(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public byte directType() {
-        return 0;
-    }
-
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        return null;
-    }
-
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
deleted file mode 100644
index 10e30ee..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryAck.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-
-import java.nio.*;
-
-/**
- * TODO write doc
- */
-public class GridQueryAck extends GridTcpCommunicationMessageAdapter {
-    /** */
-    private long reqId;
-
-    @Override public boolean writeTo(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public boolean readFrom(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public byte directType() {
-        return 0;
-    }
-
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        return null;
-    }
-
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
deleted file mode 100644
index 7a664fd..0000000
--- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridQueryRequest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.query.h2.twostep;
-
-import org.gridgain.grid.util.direct.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- * TODO write doc
- */
-public class GridQueryRequest extends GridTcpCommunicationMessageAdapter {
-    /** */
-    private long reqId;
-
-    /** */
-    private List<String> sqlQrys;
-
-    /** */
-    private List<Collection<Object>> params;
-
-
-    @Override public boolean writeTo(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public boolean readFrom(ByteBuffer buf) {
-        return false;
-    }
-
-    @Override public byte directType() {
-        return 0;
-    }
-
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        return null;
-    }
-
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
new file mode 100644
index 0000000..3e7e12c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -0,0 +1,199 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.kernal.*;
+import org.gridgain.grid.kernal.managers.communication.*;
+import org.gridgain.grid.kernal.processors.cache.query.*;
+import org.gridgain.grid.kernal.processors.query.h2.*;
+import org.gridgain.grid.kernal.processors.query.h2.twostep.messages.*;
+import org.gridgain.grid.util.future.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jdk8.backport.*;
+
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Reduce query executor.
+ */
+public class GridReduceQueryExecutor {
+    /** */
+    private GridKernalContext ctx;
+
+    /** */
+    private GridH2Indexing h2;
+
+    /** */
+    private IgniteLogger log;
+
+    /** */
+    private final AtomicLong reqIdGen = new AtomicLong();
+
+    /** */
+    private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
+
+    /**
+     * @param ctx Context.
+     * @param h2 H2 Indexing.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void start(final GridKernalContext ctx, GridH2Indexing h2) throws IgniteCheckedException {
+        this.ctx = ctx;
+        this.h2 = h2;
+
+        log = ctx.log(GridReduceQueryExecutor.class);
+
+        // TODO handle node failure.
+
+        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg != null;
+
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (msg instanceof GridNextPageResponse)
+                    onNextPage(node, (GridNextPageResponse)msg);
+                else if (msg instanceof GridQueryFailResponse)
+                    onFail(node, (GridQueryFailResponse)msg);
+            }
+        });
+    }
+
+    private void onFail(ClusterNode node, GridQueryFailResponse msg) {
+        U.error(log, "Failed to execute query.", msg.error());
+    }
+
+    private void onNextPage(final ClusterNode node, GridNextPageResponse msg) {
+        final long qryReqId = msg.queryRequestId();
+        final int qry = msg.query();
+        final int pageSize = msg.rows().size();
+
+        QueryRun r = runs.get(qryReqId);
+
+        GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
+
+        idx.addPage(new GridResultPage<Object>(node.id(), msg.query(), msg.rows()) {
+            @Override public void fetchNextPage() {
+                try {
+                    ctx.io().sendUserMessage(F.asList(node), new GridNextPageRequest(qryReqId, qry, pageSize));
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+
+        if (msg.allRows() != -1) { // Only the first page contains row count.
+            idx.addCount(msg.allRows());
+
+            r.latch.countDown();
+        }
+    }
+
+    public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) {
+        long qryReqId = reqIdGen.incrementAndGet();
+
+        QueryRun r = new QueryRun();
+
+        r.tbls = new ArrayList<>();
+
+        try {
+            r.conn = h2.connectionForSpace(null);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+
+        for (GridCacheSqlQuery mapQry : qry.mapQueries())
+            r.tbls.add(createTable(r.conn, mapQry));
+
+        Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow?
+
+        r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
+
+        this.runs.put(qryReqId, r);
+
+        try {
+            ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, qry.mapQueries())); // TODO conf page size
+
+            r.latch.await();
+
+            GridCacheSqlQuery rdc = qry.reduceQuery();
+
+            final ResultSet res = h2.executeSqlQueryWithTimer(r.conn, rdc.query(), F.asList(rdc.parameters()));
+
+            return new GridFinishedFuture(ctx, new Iter(res));
+        }
+        catch (IgniteCheckedException | InterruptedException e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+    }
+
+    private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) {
+        try {
+            try (PreparedStatement s = conn.prepareStatement(
+                "CREATE LOCAL TEMPORARY TABLE " + qry.alias() +
+                " ENGINE \"" + GridMergeTable.Engine.class.getName() + "\" " +
+                " AS SELECT * FROM (" + qry.query() + ") WHERE FALSE")) {
+                h2.bindParameters(s, F.asList(qry.parameters()));
+
+                s.execute();
+            }
+
+            return GridMergeTable.Engine.getCreated();
+        }
+        catch (SQLException|IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class QueryRun {
+        /** */
+        private List<GridMergeTable> tbls;
+
+        /** */
+        private CountDownLatch latch;
+
+        /** */
+        private Connection conn;
+    }
+
+    /**
+     *
+     */
+    private static class Iter extends GridH2ResultSetIterator<List<?>> implements GridCacheSqlResult {
+        /**
+         * @param data Data array.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected Iter(ResultSet data) throws IgniteCheckedException {
+            super(data);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected List<?> createRow() {
+            ArrayList<Object> res = new ArrayList<>(row.length);
+
+            Collections.addAll(res, row);
+
+            return res;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
new file mode 100644
index 0000000..6c6ab6a
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridResultPage.java
@@ -0,0 +1,76 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep;
+
+import org.h2.result.*;
+import org.h2.value.*;
+
+import java.util.*;
+
+/**
+ * Page result.
+ */
+public abstract class GridResultPage<S> {
+    /** */
+    private final S src;
+
+    /** */
+    private final Collection<Value[]> rows;
+
+    /** */
+    private final int page;
+
+    /**
+     * @param src Source.
+     * @param page Page.
+     * @param rows Page rows.
+     */
+    protected GridResultPage(S src, int page, Collection<Value[]> rows) {
+        assert src != null;
+        assert rows != null;
+
+        this.src = src;
+        this.page = page;
+        this.rows = rows;
+    }
+
+    /**
+     * @return Result source.
+     */
+    public S source() {
+        return src;
+    }
+
+    /**
+     * @return Page.
+     */
+    public int page() {
+        return page;
+    }
+
+    /**
+     * @return {@code true} If result is empty.
+     */
+    public boolean isEmpty() {
+        return rows.isEmpty();
+    }
+
+    /**
+     * @return Page rows.
+     */
+    public Collection<Value[]> rows() {
+        return rows;
+    }
+
+    /**
+     * Request next page.
+     */
+    public abstract void fetchNextPage();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java
new file mode 100644
index 0000000..e1eb905
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageRequest.java
@@ -0,0 +1,59 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+
+import java.io.*;
+
+/**
+ * Request to fetch next page.
+ */
+public class GridNextPageRequest implements Serializable {
+    /** */
+    private long qryReqId;
+
+    /** */
+    private int qry;
+
+    /** */
+    private int pageSize;
+
+    /**
+     * @param qryReqId Query request ID.
+     * @param qry Query.
+     * @param pageSize Page size.
+     */
+    public GridNextPageRequest(long qryReqId, int qry, int pageSize) {
+        this.qryReqId = qryReqId;
+        this.qry = qry;
+        this.pageSize = pageSize;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /**
+     * @return Query.
+     */
+    public int query() {
+        return qry;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
new file mode 100644
index 0000000..de38172
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridNextPageResponse.java
@@ -0,0 +1,149 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+import org.h2.store.*;
+import org.h2.value.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Next page response.
+ */
+public class GridNextPageResponse implements Externalizable {
+    /** */
+    private long qryReqId;
+
+    /** */
+    private int qry;
+
+    /** */
+    private int page;
+
+    /** */
+    private int allRows;
+
+    /** */
+    private Collection<Value[]> rows;
+
+    /**
+     * @param qryReqId Query request ID.
+     * @param qry Query.
+     * @param page Page.
+     * @param allRows All rows count.
+     * @param rows Rows.
+     */
+    public GridNextPageResponse(long qryReqId, int qry, int page, int allRows, Collection<Value[]> rows) {
+        assert rows != null;
+
+        this.qryReqId = qryReqId;
+        this.qry = qry;
+        this.page = page;
+        this.allRows = allRows;
+        this.rows = rows;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /**
+     * @return Query.
+     */
+    public int query() {
+        return qry;
+    }
+
+    /**
+     * @return Page.
+     */
+    public int page() {
+        return page;
+    }
+
+    /**
+     * @return All rows.
+     */
+    public int allRows() {
+        return allRows;
+    }
+
+    /**
+     * @return Rows.
+     */
+    public Collection<Value[]> rows() {
+        return rows;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(qryReqId);
+        out.writeInt(qry);
+        out.writeInt(page);
+        out.writeInt(allRows);
+
+        out.writeInt(rows.size());
+
+        if (rows.isEmpty())
+            return;
+
+        Data data = Data.create(null, 512);
+
+        boolean first = true;
+
+        for (Value[] row : rows) {
+            if (first) {
+                out.writeInt(row.length);
+
+                first = false;
+            }
+
+            for (Value val : row)
+                data.writeValue(val);
+        }
+
+        out.writeInt(data.length());
+        out.write(data.getBytes(), 0, data.length());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        qryReqId = in.readLong();
+        qry = in.readInt();
+        page = in.readInt();
+        allRows = in.readInt();
+
+        int rowCnt = in.readInt();
+
+        if (rowCnt == 0)
+            rows = Collections.emptyList();
+        else {
+            rows = new ArrayList<>(rowCnt);
+
+            int cols = in.readInt();
+            int dataSize = in.readInt();
+
+            Data data = Data.create(null, dataSize);
+
+            for (int r = 0; r < rowCnt; r++) {
+                Value[] row = new Value[cols];
+
+                for (int c = 0; c < cols; c++)
+                    row[c] = data.readValue();
+
+                rows.add(row);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/193d9b32/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java
new file mode 100644
index 0000000..fe55114
--- /dev/null
+++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/messages/GridQueryAck.java
@@ -0,0 +1,34 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.query.h2.twostep.messages;
+
+import java.io.*;
+
+/**
+ * TODO write doc
+ */
+public class GridQueryAck implements Serializable {
+    /** */
+    private long reqId;
+
+    /**
+     * @param reqId Request ID.
+     */
+    public GridQueryAck(long reqId) {
+        this.reqId = reqId;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+}


Mime
View raw message