ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [06/11] ignite git commit: ignite-1232 Distributed SQL joins implementation
Date Fri, 22 Jul 2016 14:08:45 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 6a079f0..04449ac 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -59,11 +60,13 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
@@ -71,11 +74,14 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.typedef.CIX2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.Session;
@@ -92,6 +98,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
 
 /**
  * Reduce query executor.
@@ -101,6 +108,9 @@ public class GridReduceQueryExecutor {
     public static final byte QUERY_POOL = GridIoPolicy.SYSTEM_POOL;
 
     /** */
+    private static final IgniteProductVersion DISTRIBUTED_JOIN_SINCE = IgniteProductVersion.fromString("1.7.0");
+
+    /** */
     private GridKernalContext ctx;
 
     /** */
@@ -149,6 +159,13 @@ public class GridReduceQueryExecutor {
     /** */
     private final GridSpinBusyLock busyLock;
 
+    /** */
+    private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() {
+        @Override public void applyx(ClusterNode locNode, Message msg) {
+            h2.mapQueryExecutor().onMessage(locNode.id(), msg);
+        }
+    };
+
     /**
      * @param busyLock Busy lock.
      */
@@ -173,6 +190,9 @@ public class GridReduceQueryExecutor {
                     return;
 
                 try {
+                    if (msg instanceof GridCacheQueryMarshallable)
+                        ((GridCacheQueryMarshallable)msg).unmarshall(ctx.config().getMarshaller(), ctx);
+
                     GridReduceQueryExecutor.this.onMessage(nodeId, msg);
                 }
                 finally {
@@ -339,13 +359,13 @@ public class GridReduceQueryExecutor {
      * @param extraSpaces Extra spaces.
      * @return {@code true} If preloading is active.
      */
-    private boolean isPreloadingActive(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+    private boolean isPreloadingActive(final GridCacheContext<?, ?> cctx, List<Integer> extraSpaces) {
         if (hasMovingPartitions(cctx))
             return true;
 
         if (extraSpaces != null) {
-            for (String extraSpace : extraSpaces) {
-                if (hasMovingPartitions(cacheContext(extraSpace)))
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                if (hasMovingPartitions(cacheContext(extraSpaces.get(i))))
                     return true;
             }
         }
@@ -357,7 +377,7 @@ public class GridReduceQueryExecutor {
      * @param cctx Cache context.
      * @return {@code true} If cache context
      */
-    private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
+    private boolean hasMovingPartitions(GridCacheContext<?, ?> cctx) {
         GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false);
 
         for (GridDhtPartitionMap2 map : fullMap.values()) {
@@ -369,34 +389,34 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param name Cache name.
+     * @param cacheId Cache ID.
      * @return Cache context.
      */
-    private GridCacheContext<?,?> cacheContext(String name) {
-        return ctx.cache().internalCache(name).context();
+    private GridCacheContext<?,?> cacheContext(Integer cacheId) {
+        return ctx.cache().context().cacheContext(cacheId);
     }
 
     /**
      * @param topVer Topology version.
      * @param cctx Cache context for main space.
      * @param extraSpaces Extra spaces.
-     * @return Data nodes or {@code null} if repartitioning started and we need to retry..
+     * @return Data nodes or {@code null} if repartitioning started and we need to retry.
      */
     private Collection<ClusterNode> stableDataNodes(
         AffinityTopologyVersion topVer,
-        final GridCacheContext<?,?> cctx,
-        List<String> extraSpaces
+        final GridCacheContext<?, ?> cctx,
+        List<Integer> extraSpaces
     ) {
-        String space = cctx.name();
-
         Set<ClusterNode> nodes = new HashSet<>(cctx.affinity().assignment(topVer).primaryPartitionNodes());
 
         if (F.isEmpty(nodes))
-            throw new CacheException("Failed to find data nodes for cache: " + space);
+            throw new CacheException("Failed to find data nodes for cache: " + cctx.name());
 
         if (!F.isEmpty(extraSpaces)) {
-            for (String extraSpace : extraSpaces) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i));
+
+                String extraSpace = extraCctx.name();
 
                 if (extraCctx.isLocal())
                     continue; // No consistency guaranties for local caches.
@@ -448,10 +468,16 @@ public class GridReduceQueryExecutor {
     /**
      * @param cctx Cache context.
      * @param qry Query.
-     * @param keepBinary Keep binary.
+     * @param keepPortable Keep portable.
+     * @param enforceJoinOrder Enforce join order of tables.
      * @return Cursor.
      */
-    public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepBinary) {
+    public Iterator<List<?>> query(
+        GridCacheContext<?, ?> cctx,
+        GridCacheTwoStepQuery qry,
+        boolean keepPortable,
+        boolean enforceJoinOrder
+    ) {
         for (int attempt = 0;; attempt++) {
             if (attempt != 0) {
                 try {
@@ -464,21 +490,15 @@ public class GridReduceQueryExecutor {
                 }
             }
 
-            long qryReqId = reqIdGen.incrementAndGet();
+            final long qryReqId = reqIdGen.incrementAndGet();
 
-            QueryRun r = new QueryRun();
+            final String space = cctx.name();
 
-            r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
-
-            r.idxs = new ArrayList<>(qry.mapQueries().size());
-
-            String space = cctx.name();
-
-            r.conn = (JdbcConnection)h2.connectionForSpace(space);
+            final QueryRun r = new QueryRun(h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize());
 
             AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
-            List<String> extraSpaces = extraSpaces(space, qry.spaces());
+            List<Integer> extraSpaces = qry.extraCaches();
 
             Collection<ClusterNode> nodes;
 
@@ -529,13 +549,12 @@ public class GridReduceQueryExecutor {
 
                     idx = tbl.getScanIndex(null);
 
-                    fakeTable(r.conn, tblIdx++).setInnerTable(tbl);
+                    fakeTable(r.conn, tblIdx++).innerTable(tbl);
                 }
                 else
                     idx = GridMergeIndexUnsorted.createDummy(ctx);
 
-                for (ClusterNode node : nodes)
-                    idx.addSource(node.id());
+                idx.setSources(nodes);
 
                 r.idxs.add(idx);
             }
@@ -548,10 +567,10 @@ public class GridReduceQueryExecutor {
                 if (ctx.clientDisconnected()) {
                     throw new CacheException("Query was cancelled, client node disconnected.",
                         new IgniteClientDisconnectedException(ctx.cluster().clientReconnectFuture(),
-                        "Client node disconnected."));
+                            "Client node disconnected."));
                 }
 
-                Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
+                List<GridCacheSqlQuery> mapQrys = qry.mapQueries();
 
                 if (qry.explain()) {
                     mapQrys = new ArrayList<>(qry.mapQueries().size());
@@ -560,17 +579,37 @@ public class GridReduceQueryExecutor {
                         mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters()));
                 }
 
-                if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
-                    Marshaller m = ctx.config().getMarshaller();
+                boolean retry = false;
 
-                    for (GridCacheSqlQuery mapQry : mapQrys)
-                        mapQry.marshallParams(m);
-                }
+                IgniteProductVersion minNodeVer = cctx.shared().exchange().minimumNodeVersion(topVer);
 
-                boolean retry = false;
+                final boolean oldStyle = minNodeVer.compareToIgnoreTimestamp(DISTRIBUTED_JOIN_SINCE) < 0;
+                final boolean distributedJoins = qry.distributedJoins();
+
+                if (oldStyle && distributedJoins)
+                    throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes.");
 
                 if (send(nodes,
-                    new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
+                    oldStyle ?
+                        new GridQueryRequest(qryReqId,
+                            r.pageSize,
+                            space,
+                            mapQrys,
+                            topVer,
+                            extraSpaces(space, qry.spaces()),
+                            null) :
+                        new GridH2QueryRequest()
+                            .requestId(qryReqId)
+                            .topologyVersion(topVer)
+                            .pageSize(r.pageSize)
+                            .caches(qry.caches())
+                            .tables(distributedJoins ? qry.tables() : null)
+                            .partitions(convert(partsMap))
+                            .queries(mapQrys)
+                            .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0),
+                    oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
+                    distributedJoins)
+                ) {
                     awaitAllReplies(r, nodes);
 
                     Object state = r.state.get();
@@ -599,9 +638,6 @@ public class GridReduceQueryExecutor {
                 Iterator<List<?>> resIter = null;
 
                 if (!retry) {
-                    if (qry.explain())
-                        return explainPlan(r.conn, space, qry);
-
                     if (skipMergeTbl) {
                         List<List<?>> res = new ArrayList<>();
 
@@ -616,7 +652,7 @@ public class GridReduceQueryExecutor {
 
                             int cols = row.getColumnCount();
 
-                            List<Object> resRow  = new ArrayList<>(cols);
+                            List<Object> resRow = new ArrayList<>(cols);
 
                             for (int c = 0; c < cols; c++)
                                 resRow.add(row.getValue(c).getObject());
@@ -627,32 +663,52 @@ public class GridReduceQueryExecutor {
                         resIter = res.iterator();
                     }
                     else {
-                        GridCacheSqlQuery rdc = qry.reduceQuery();
+                        UUID locNodeId = ctx.localNodeId();
 
-                        // Statement caching is prohibited here because we can't guarantee correct merge index reuse.
-                        ResultSet res = h2.executeSqlQueryWithTimer(space,
-                            r.conn,
-                            rdc.query(),
-                            F.asList(rdc.parameters()),
-                            false);
+                        h2.setupConnection(r.conn, false, enforceJoinOrder);
 
-                        resIter = new Iter(res);
-                    }
-                }
+                        GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
+                            .pageSize(r.pageSize).distributedJoins(false));
 
-                for (GridMergeIndex idx : r.idxs) {
-                    if (!idx.fetchedAll()) // We have to explicitly cancel queries on remote nodes.
-                        send(nodes, new GridQueryCancelRequest(qryReqId), null);
+                        try {
+                            if (qry.explain())
+                                return explainPlan(r.conn, space, qry);
+
+                            GridCacheSqlQuery rdc = qry.reduceQuery();
+
+                            ResultSet res = h2.executeSqlQueryWithTimer(space,
+                                r.conn,
+                                rdc.query(),
+                                F.asList(rdc.parameters()),
+                                false);
+
+                            resIter = new Iter(res);
+                        }
+                        finally {
+                            GridH2QueryContext.clearThreadLocal();
+                        }
+                    }
                 }
 
                 if (retry) {
+                    send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
+
                     if (Thread.currentThread().isInterrupted())
                         throw new IgniteInterruptedCheckedException("Query was interrupted.");
 
                     continue;
                 }
 
-                return new GridQueryCacheObjectsIterator(resIter, cctx, keepBinary);
+                final Collection<ClusterNode> finalNodes = nodes;
+
+                return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable) {
+                    @Override public void close() throws Exception {
+                        super.close();
+
+                        if (distributedJoins || !allIndexesFetched(r.idxs))
+                            send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
+                    }
+                };
             }
             catch (IgniteCheckedException | RuntimeException e) {
                 U.closeQuiet(r.conn);
@@ -678,13 +734,26 @@ public class GridReduceQueryExecutor {
 
                 if (!skipMergeTbl) {
                     for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
-                        fakeTable(null, i).setInnerTable(null); // Drop all merge tables.
+                        fakeTable(null, i).innerTable(null); // Drop all merge tables.
                 }
             }
         }
     }
 
     /**
+     * @param idxs Merge indexes.
+     * @return {@code true} If all remote data was fetched.
+     */
+    private static boolean allIndexesFetched(List<GridMergeIndex> idxs) {
+        for (int i = 0; i <  idxs.size(); i++) {
+            if (!idxs.get(i).fetchedAll())
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
      * @param r Query run.
      * @param nodes Nodes to check periodically if they alive.
      * @throws IgniteInterruptedCheckedException If interrupted.
@@ -715,6 +784,7 @@ public class GridReduceQueryExecutor {
     /**
      * Gets or creates new fake table for index.
      *
+     * @param c Connection.
      * @param idx Index of table.
      * @return Table.
      */
@@ -759,8 +829,8 @@ public class GridReduceQueryExecutor {
      * @param extraSpaces Extra spaces.
      * @return Collection of all data nodes owning all the caches or {@code null} for retry.
      */
-    private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?,?> cctx,
-        List<String> extraSpaces) {
+    private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?, ?> cctx,
+        List<Integer> extraSpaces) {
         assert cctx.isReplicated() : cctx.name() + " must be replicated";
 
         Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);
@@ -769,15 +839,15 @@ public class GridReduceQueryExecutor {
             return null; // Retry.
 
         if (!F.isEmpty(extraSpaces)) {
-            for (String extraSpace : extraSpaces) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
 
                 if (extraCctx.isLocal())
                     continue;
 
                 if (!extraCctx.isReplicated())
                     throw new CacheException("Queries running on replicated cache should not contain JOINs " +
-                        "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]");
+                        "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraCctx.name() + "]");
 
                 Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
 
@@ -846,14 +916,14 @@ public class GridReduceQueryExecutor {
      */
     @SuppressWarnings("unchecked")
     private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx,
-        List<String> extraSpaces) {
+        List<Integer> extraSpaces) {
         assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
 
         final int partsCnt = cctx.affinity().partitions();
 
         if (extraSpaces != null) { // Check correct number of partitions for partitioned caches.
-            for (String extraSpace : extraSpaces) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
 
                 if (extraCctx.isReplicated() || extraCctx.isLocal())
                     continue;
@@ -862,7 +932,7 @@ public class GridReduceQueryExecutor {
 
                 if (parts != partsCnt)
                     throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" +
-                        cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraSpace + ", parts2=" + parts + "]");
+                        cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() + ", parts2=" + parts + "]");
             }
         }
 
@@ -885,8 +955,8 @@ public class GridReduceQueryExecutor {
         if (extraSpaces != null) {
             // Find owner intersections for each participating partitioned cache partition.
             // We need this for logical collocation between different partitioned caches with the same affinity.
-            for (String extraSpace : extraSpaces) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
 
                 if (extraCctx.isReplicated() || extraCctx.isLocal())
                     continue;
@@ -895,10 +965,10 @@ public class GridReduceQueryExecutor {
                     List<ClusterNode> owners = extraCctx.topology().owners(p);
 
                     if (F.isEmpty(owners)) {
-                        if (!F.isEmpty(dataNodes(extraSpace, NONE)))
+                        if (!F.isEmpty(dataNodes(extraCctx.name(), NONE)))
                             return null; // Retry.
 
-                        throw new CacheException("Failed to find data nodes [cache=" + extraSpace + ", part=" + p + "]");
+                        throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + ", part=" + p + "]");
                     }
 
                     if (partLocs[p] == null)
@@ -913,8 +983,8 @@ public class GridReduceQueryExecutor {
             }
 
             // Filter nodes where not all the replicated caches loaded.
-            for (String extraSpace : extraSpaces) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i));
 
                 if (!extraCctx.isReplicated())
                     continue;
@@ -960,7 +1030,7 @@ public class GridReduceQueryExecutor {
      * @param allSpaces All spaces.
      * @return List of all extra spaces or {@code null} if none.
      */
-    private List<String> extraSpaces(String mainSpace, Set<String> allSpaces) {
+    private List<String> extraSpaces(String mainSpace, Collection<String> allSpaces) {
         if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace)))
             return null;
 
@@ -996,7 +1066,7 @@ public class GridReduceQueryExecutor {
         for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
             GridMergeTable tbl = createMergeTable(c, mapQry, false);
 
-            fakeTable(c, tblIdx++).setInnerTable(tbl);
+            fakeTable(c, tblIdx++).innerTable(tbl);
         }
 
         GridCacheSqlQuery rdc = qry.reduceQuery();
@@ -1032,39 +1102,27 @@ public class GridReduceQueryExecutor {
     /**
      * @param nodes Nodes.
      * @param msg Message.
-     * @param partsMap Partitions.
+     * @param specialize Optional closure to specialize message for each node.
+     * @param runLocParallel Run local handler in parallel thread.
      * @return {@code true} If all messages sent successfully.
      */
     private boolean send(
         Collection<ClusterNode> nodes,
         Message msg,
-        Map<ClusterNode,IntArray> partsMap
+        @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize,
+        boolean runLocParallel
     ) {
-        boolean locNodeFound = false;
-
-        boolean ok = true;
-
-        for (ClusterNode node : nodes) {
-            if (node.isLocal()) {
-                locNodeFound = true;
-
-                continue;
-            }
-
-            try {
-                ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, partsMap), QUERY_POOL);
-            }
-            catch (IgniteCheckedException e) {
-                ok = false;
-
-                U.warn(log, e.getMessage());
-            }
-        }
-
-        if (locNodeFound) // Local node goes the last to allow parallel execution.
-            h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.discovery().localNode(), partsMap));
-
-        return ok;
+        if (log.isDebugEnabled())
+            log.debug("Sending: [msg=" + msg + ", nodes=" + nodes + ", specialize=" + specialize + "]");
+
+        return h2.send(GridTopic.TOPIC_QUERY,
+            GridTopic.TOPIC_QUERY.ordinal(),
+            nodes,
+            msg,
+            specialize,
+            locNodeHnd,
+            QUERY_POOL,
+            runLocParallel);
     }
 
     /**
@@ -1074,8 +1132,7 @@ public class GridReduceQueryExecutor {
      * @return Copy of message with partitions set.
      */
     private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> partsMap) {
-        if (partsMap == null)
-            return msg;
+        assert partsMap != null;
 
         GridQueryRequest res = new GridQueryRequest((GridQueryRequest)msg);
 
@@ -1083,11 +1140,35 @@ public class GridReduceQueryExecutor {
 
         assert parts != null : node;
 
-        int[] partsArr = new int[parts.size()];
+        res.partitions(toArray(parts));
 
-        parts.toArray(partsArr);
+        return res;
+    }
 
-        res.partitions(partsArr);
+    /**
+     * @param ints Ints.
+     * @return Array.
+     */
+    public static int[] toArray(IntArray ints) {
+        int[] res = new int[ints.size()];
+
+        ints.toArray(res);
+
+        return res;
+    }
+
+    /**
+     * @param m Map.
+     * @return Converted map.
+     */
+    private static Map<UUID, int[]> convert(Map<ClusterNode, IntArray> m) {
+        if (m == null)
+            return null;
+
+        Map<UUID, int[]> res = U.newHashMap(m.size());
+
+        for (Map.Entry<ClusterNode,IntArray> entry : m.entrySet())
+            res.put(entry.getKey().id(), toArray(entry.getValue()));
 
         return res;
     }
@@ -1097,7 +1178,7 @@ public class GridReduceQueryExecutor {
      * @param qry Query.
      * @param explain Explain.
      * @return Table.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException If failed.
      */
     private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
         throws IgniteCheckedException {
@@ -1165,25 +1246,36 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     *
+     * Query run.
      */
     private static class QueryRun {
         /** */
-        private List<GridMergeIndex> idxs;
+        private final List<GridMergeIndex> idxs;
 
         /** */
         private CountDownLatch latch;
 
         /** */
-        private JdbcConnection conn;
+        private final JdbcConnection conn;
 
         /** */
-        private int pageSize;
+        private final int pageSize;
 
         /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
         private final AtomicReference<Object> state = new AtomicReference<>();
 
         /**
+         * @param conn Connection.
+         * @param idxsCnt Number of indexes.
+         * @param pageSize Page size.
+         */
+        private QueryRun(Connection conn, int idxsCnt, int pageSize) {
+            this.conn = (JdbcConnection)conn;
+            this.idxs = new ArrayList<>(idxsCnt);
+            this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
+        }
+
+        /**
          * @param o Fail state object.
          * @param nodeId Node ID.
          */
@@ -1240,4 +1332,24 @@ public class GridReduceQueryExecutor {
             return res;
         }
     }
+
+    /**
+     *
+     */
+    private class ExplicitPartitionsSpecializer implements IgniteBiClosure<ClusterNode,Message,Message> {
+        /** */
+        private final Map<ClusterNode,IntArray> partsMap;
+
+        /**
+         * @param partsMap Partitions map.
+         */
+        private ExplicitPartitionsSpecializer(Map<ClusterNode,IntArray> partsMap) {
+            this.partsMap = partsMap;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Message apply(ClusterNode n, Message msg) {
+            return copy(msg, n, partsMap);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
index a38d137..d46fb2f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridThreadLocalTable.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import javax.cache.CacheException;
 import org.h2.api.TableEngine;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.DbObject;
@@ -59,61 +60,74 @@ public class GridThreadLocalTable extends Table {
     /**
      * @param t Table or {@code null} to reset existing.
      */
-    public void setInnerTable(Table t) {
+    public void innerTable(Table t) {
         if (t == null)
             tbl.remove();
         else
             tbl.set(t);
     }
 
+    /**
+     * @return Inner table.
+     */
+    private Table innerTable() {
+        Table t = tbl.get();
+
+        if (t == null)
+            throw new CacheException("Table `" + getName() + "` can be accessed only within Ignite query context.");
+
+        return t;
+    }
+
     /** {@inheritDoc} */
     @Override public Index getPrimaryKey() {
-        return tbl.get().getPrimaryKey();
+        return innerTable().getPrimaryKey();
     }
 
     /** {@inheritDoc} */
     @Override public Column getRowIdColumn() {
-        return tbl.get().getRowIdColumn();
+        return innerTable().getRowIdColumn();
     }
 
     /** {@inheritDoc} */
-    @Override public PlanItem getBestPlanItem(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
-        return tbl.get().getBestPlanItem(session, masks, filter, sortOrder);
+    @Override public PlanItem getBestPlanItem(Session session, int[] masks, TableFilter[] filters, int filter,
+        SortOrder sortOrder) {
+        return innerTable().getBestPlanItem(session, masks, filters, filter, sortOrder);
     }
 
     /** {@inheritDoc} */
     @Override public Value getDefaultValue(Session session, Column column) {
-        return tbl.get().getDefaultValue(session, column);
+        return innerTable().getDefaultValue(session, column);
     }
 
     /** {@inheritDoc} */
     @Override public SearchRow getTemplateSimpleRow(boolean singleColumn) {
-        return tbl.get().getTemplateSimpleRow(singleColumn);
+        return innerTable().getTemplateSimpleRow(singleColumn);
     }
 
     /** {@inheritDoc} */
     @Override public Row getTemplateRow() {
-        return tbl.get().getTemplateRow();
+        return innerTable().getTemplateRow();
     }
 
     /** {@inheritDoc} */
     @Override public Column getColumn(String columnName) {
-        return tbl.get().getColumn(columnName);
+        return innerTable().getColumn(columnName);
     }
 
     /** {@inheritDoc} */
     @Override public Column getColumn(int index) {
-        return tbl.get().getColumn(index);
+        return innerTable().getColumn(index);
     }
 
     /** {@inheritDoc} */
     @Override public Index getIndexForColumn(Column column) {
-        return tbl.get().getIndexForColumn(column);
+        return innerTable().getIndexForColumn(column);
     }
 
     /** {@inheritDoc} */
     @Override public Column[] getColumns() {
-        return tbl.get().getColumns();
+        return innerTable().getColumns();
     }
 
     /** {@inheritDoc} */
@@ -122,8 +136,8 @@ public class GridThreadLocalTable extends Table {
     }
 
     /** {@inheritDoc} */
-    @Override public void lock(Session session, boolean exclusive, boolean force) {
-        tbl.get().lock(session, exclusive, force);
+    @Override public boolean lock(Session session, boolean exclusive, boolean force) {
+        return innerTable().lock(session, exclusive, force);
     }
 
     /** {@inheritDoc} */
@@ -133,33 +147,33 @@ public class GridThreadLocalTable extends Table {
 
     /** {@inheritDoc} */
     @Override public void unlock(Session s) {
-        tbl.get().unlock(s);
+        innerTable().unlock(s);
     }
 
     /** {@inheritDoc} */
     @Override public Index addIndex(Session session, String indexName, int indexId, IndexColumn[] cols,
         IndexType indexType, boolean create, String indexComment) {
-        return tbl.get().addIndex(session, indexName, indexId, cols, indexType, create, indexComment);
+        return innerTable().addIndex(session, indexName, indexId, cols, indexType, create, indexComment);
     }
 
     /** {@inheritDoc} */
     @Override public void removeRow(Session session, Row row) {
-        tbl.get().removeRow(session, row);
+        innerTable().removeRow(session, row);
     }
 
     /** {@inheritDoc} */
     @Override public void truncate(Session session) {
-        tbl.get().truncate(session);
+        innerTable().truncate(session);
     }
 
     /** {@inheritDoc} */
     @Override public void addRow(Session session, Row row) {
-        tbl.get().addRow(session, row);
+        innerTable().addRow(session, row);
     }
 
     /** {@inheritDoc} */
     @Override public void checkSupportAlter() {
-        tbl.get().checkSupportAlter();
+        innerTable().checkSupportAlter();
     }
 
     /** {@inheritDoc} */
@@ -169,22 +183,22 @@ public class GridThreadLocalTable extends Table {
 
     /** {@inheritDoc} */
     @Override public Index getUniqueIndex() {
-        return tbl.get().getUniqueIndex();
+        return innerTable().getUniqueIndex();
     }
 
     /** {@inheritDoc} */
     @Override public Index getScanIndex(Session session) {
-        return tbl.get().getScanIndex(session);
+        return innerTable().getScanIndex(session);
     }
 
     /** {@inheritDoc} */
     @Override public ArrayList<Index> getIndexes() {
-        return tbl.get().getIndexes();
+        return innerTable().getIndexes();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isLockedExclusively() {
-        return tbl.get().isLockedExclusively();
+        return innerTable().isLockedExclusively();
     }
 
     /** {@inheritDoc} */
@@ -194,12 +208,12 @@ public class GridThreadLocalTable extends Table {
 
     /** {@inheritDoc} */
     @Override public boolean isDeterministic() {
-        return tbl.get().isDeterministic();
+        return innerTable().isDeterministic();
     }
 
     /** {@inheritDoc} */
     @Override public boolean canGetRowCount() {
-        return tbl.get().canGetRowCount();
+        return innerTable().canGetRowCount();
     }
 
     /** {@inheritDoc} */
@@ -209,7 +223,7 @@ public class GridThreadLocalTable extends Table {
 
     /** {@inheritDoc} */
     @Override public long getRowCount(Session session) {
-        return tbl.get().getRowCount(session);
+        return innerTable().getRowCount(session);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java
index 362a760..571f9ac 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Array.java
@@ -115,7 +115,7 @@ public class GridH2Array extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Array.class);
     }
 
     /** {@inheritDoc} */
@@ -127,4 +127,9 @@ public class GridH2Array extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(x);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
index 0628764..edd404e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
@@ -99,7 +99,7 @@ public class GridH2Boolean extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Boolean.class);
     }
 
     /** {@inheritDoc} */
@@ -107,7 +107,13 @@ public class GridH2Boolean extends GridH2ValueMessage {
         return -5;
     }
 
+    /** {@inheritDoc} */
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(x);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java
index 9f608b9..894794e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Byte.java
@@ -99,7 +99,7 @@ public class GridH2Byte extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Byte.class);
     }
 
     /** {@inheritDoc} */
@@ -111,4 +111,9 @@ public class GridH2Byte extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(x);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java
index 0fb9e5d..29a52be 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Bytes.java
@@ -24,6 +24,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.h2.value.Value;
 import org.h2.value.ValueBytes;
 
+import static org.h2.util.StringUtils.convertBytesToHex;
+
 /**
  * H2 Bytes.
  */
@@ -99,7 +101,7 @@ public class GridH2Bytes extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Bytes.class);
     }
 
     /** {@inheritDoc} */
@@ -111,4 +113,9 @@ public class GridH2Bytes extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "b_" + convertBytesToHex(b);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
index 1a81234..942ab7c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
@@ -102,7 +102,7 @@ public class GridH2CacheObject extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2CacheObject.class);
     }
 
     /** {@inheritDoc} */
@@ -146,4 +146,9 @@ public class GridH2CacheObject extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 2;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(obj);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java
index 6bd5237..8025257 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Date.java
@@ -101,7 +101,7 @@ public class GridH2Date extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Date.class);
     }
 
     /** {@inheritDoc} */
@@ -113,4 +113,9 @@ public class GridH2Date extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(date);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java
index f5d4865..a3ad444 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Decimal.java
@@ -26,6 +26,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.h2.value.Value;
 import org.h2.value.ValueDecimal;
 
+import static org.h2.util.StringUtils.convertBytesToHex;
+
 /**
  * H2 Decimal.
  */
@@ -121,7 +123,7 @@ public class GridH2Decimal extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Decimal.class);
     }
 
     /** {@inheritDoc} */
@@ -133,4 +135,9 @@ public class GridH2Decimal extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 2;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return scale + "_" + convertBytesToHex(b);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java
index 481db30..2ceea8d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Double.java
@@ -99,7 +99,7 @@ public class GridH2Double extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Double.class);
     }
 
     /** {@inheritDoc} */
@@ -111,4 +111,9 @@ public class GridH2Double extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(x);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java
index 55f9380..6923470 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Float.java
@@ -99,7 +99,7 @@ public class GridH2Float extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Float.class);
     }
 
     /** {@inheritDoc} */
@@ -111,4 +111,9 @@ public class GridH2Float extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(x);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java
index 21070d1..0d118b4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Geometry.java
@@ -25,6 +25,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.h2.value.Value;
 
+import static org.h2.util.StringUtils.convertBytesToHex;
+
 /**
  * H2 Geometry.
  */
@@ -120,7 +122,7 @@ public class GridH2Geometry extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Geometry.class);
     }
 
     /** {@inheritDoc} */
@@ -132,4 +134,9 @@ public class GridH2Geometry extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "g_" + convertBytesToHex(b);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
new file mode 100644
index 0000000..e49c48f
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeRequest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Range request.
+ */
+public class GridH2IndexRangeRequest implements Message {
+    /** */
+    private UUID originNodeId;
+
+    /** */
+    private long qryId;
+
+    /** */
+    private int batchLookupId;
+
+    /** */
+    @GridDirectCollection(Message.class)
+    private List<GridH2RowRangeBounds> bounds;
+
+    /**
+     * @param bounds Range bounds list.
+     */
+    public void bounds(List<GridH2RowRangeBounds> bounds) {
+        this.bounds = bounds;
+    }
+
+    /**
+     * @return Range bounds list.
+     */
+    public List<GridH2RowRangeBounds> bounds() {
+        return bounds;
+    }
+
+    /**
+     * @return Origin node ID.
+     */
+    public UUID originNodeId() {
+        return originNodeId;
+    }
+
+    /**
+     * @param originNodeId Origin node ID.
+     */
+    public void originNodeId(UUID originNodeId) {
+        this.originNodeId = originNodeId;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long queryId() {
+        return qryId;
+    }
+
+    /**
+     * @param qryId Query ID.
+     */
+    public void queryId(long qryId) {
+        this.qryId = qryId;
+    }
+
+    /**
+     * @param batchLookupId Batch lookup ID.
+     */
+    public void batchLookupId(int batchLookupId) {
+        this.batchLookupId = batchLookupId;
+    }
+
+    /**
+     * @return Batch lookup ID.
+     */
+    public int batchLookupId() {
+        return batchLookupId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("batchLookupId", batchLookupId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeCollection("bounds", bounds, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeUuid("originNodeId", originNodeId))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeLong("qryId", qryId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                batchLookupId = reader.readInt("batchLookupId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                bounds = reader.readCollection("bounds", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                originNodeId = reader.readUuid("originNodeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                qryId = reader.readLong("qryId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridH2IndexRangeRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -30;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridH2IndexRangeRequest.class, this, "boundsSize", bounds == null ? null : bounds.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
new file mode 100644
index 0000000..c6414bd
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2IndexRangeResponse.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Range response message.
+ */
+public class GridH2IndexRangeResponse implements Message {
+    /** */
+    public static final byte STATUS_OK = 0;
+
+    /** */
+    public static final byte STATUS_ERROR = 1;
+
+    /** */
+    public static final byte STATUS_NOT_FOUND = 2;
+
+    /** */
+    private UUID originNodeId;
+
+    /** */
+    private long qryId;
+
+    /** */
+    private int batchLookupId;
+
+    /** */
+    @GridDirectCollection(Message.class)
+    private List<GridH2RowRange> ranges;
+
+    /** */
+    private byte status;
+
+    /** */
+    private String err;
+
+    /**
+     * @param ranges Ranges.
+     */
+    public void ranges(List<GridH2RowRange> ranges) {
+        this.ranges = ranges;
+    }
+
+    /**
+     * @return Ranges.
+     */
+    public List<GridH2RowRange> ranges() {
+        return ranges;
+    }
+
+    /**
+     * @return Origin node ID.
+     */
+    public UUID originNodeId() {
+        return originNodeId;
+    }
+
+    /**
+     * @param originNodeId Origin node ID.
+     */
+    public void originNodeId(UUID originNodeId) {
+        this.originNodeId = originNodeId;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long queryId() {
+        return qryId;
+    }
+
+    /**
+     * @param qryId Query ID.
+     */
+    public void queryId(long qryId) {
+        this.qryId = qryId;
+    }
+
+    /**
+     * @param err Error message.
+     */
+    public void error(String err) {
+        this.err = err;
+    }
+
+    /**
+     * @return Error message or {@code null} if everything is ok.
+     */
+    public String error() {
+        return err;
+    }
+
+    /**
+     * @param status Status.
+     */
+    public void status(byte status) {
+        this.status = status;
+    }
+
+    /**
+     * @return Status.
+     */
+    public byte status() {
+        return status;
+    }
+
+    /**
+     * @param batchLookupId Batch lookup ID.
+     */
+    public void batchLookupId(int batchLookupId) {
+        this.batchLookupId = batchLookupId;
+    }
+
+    /**
+     * @return Batch lookup ID.
+     */
+    public int batchLookupId() {
+        return batchLookupId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("batchLookupId", batchLookupId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeString("err", err))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeUuid("originNodeId", originNodeId))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeLong("qryId", qryId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeCollection("ranges", ranges, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeByte("status", status))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                batchLookupId = reader.readInt("batchLookupId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                err = reader.readString("err");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                originNodeId = reader.readUuid("originNodeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                qryId = reader.readLong("qryId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                ranges = reader.readCollection("ranges", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                status = reader.readByte("status");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridH2IndexRangeResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -31;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 6;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridH2IndexRangeResponse.class, this, "rangesSize", ranges == null ? null : ranges.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java
index 30c7623..7c6046c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Integer.java
@@ -99,7 +99,7 @@ public class GridH2Integer extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Integer.class);
     }
 
     /** {@inheritDoc} */
@@ -111,4 +111,20 @@ public class GridH2Integer extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return obj == this || (obj != null && obj.getClass() == GridH2Integer.class && x == ((GridH2Integer)obj).x);
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return x;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(x);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java
index edfde33..b989171 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2JavaObject.java
@@ -24,6 +24,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.h2.value.Value;
 import org.h2.value.ValueJavaObject;
 
+import static org.h2.util.StringUtils.convertBytesToHex;
+
 /**
  * H2 Java Object.
  */
@@ -99,7 +101,7 @@ public class GridH2JavaObject extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2JavaObject.class);
     }
 
     /** {@inheritDoc} */
@@ -111,4 +113,9 @@ public class GridH2JavaObject extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "j_" + convertBytesToHex(b);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java
index e6af9e1..3d360f0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Long.java
@@ -99,7 +99,7 @@ public class GridH2Long extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Long.class);
     }
 
     /** {@inheritDoc} */
@@ -111,4 +111,9 @@ public class GridH2Long extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(x);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java
index 2394b78..50a49ba 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Null.java
@@ -64,7 +64,13 @@ public class GridH2Null extends GridH2ValueMessage {
     @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         reader.setBuffer(buf);
 
-        return reader.beforeMessageRead() && super.readFrom(buf, reader);
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        return reader.afterMessageRead(GridH2Null.class);
     }
 
     /** {@inheritDoc} */
@@ -76,4 +82,9 @@ public class GridH2Null extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 0;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "NULL";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
new file mode 100644
index 0000000..dc82b2c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
+import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Query request.
+ */
+public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Map query will not destroy context until explicit query cancel request
+     * will be received because distributed join requests can be received.
+     */
+    public static int FLAG_DISTRIBUTED_JOINS = 1;
+
+    /** */
+    private long reqId;
+
+    /** */
+    @GridToStringInclude
+    @GridDirectCollection(Integer.class)
+    private List<Integer> caches;
+
+    /** Topology version. */
+    private AffinityTopologyVersion topVer;
+
+    /** Explicit partitions mappings for nodes. */
+    @GridToStringInclude
+    @GridDirectMap(keyType = UUID.class, valueType = int[].class)
+    private Map<UUID, int[]> parts;
+
+    /** */
+    private int pageSize;
+
+    /** */
+    @GridToStringInclude
+    @GridDirectCollection(Message.class)
+    private List<GridCacheSqlQuery> qrys;
+
+    /** */
+    private byte flags;
+
+    /** */
+    @GridToStringInclude
+    @GridDirectCollection(String.class)
+    private Collection<String> tbls;
+
+    /**
+     * @param tbls Tables.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest tables(Collection<String> tbls) {
+        this.tbls = tbls;
+
+        return this;
+    }
+
+    /**
+     * @return Tables.
+     */
+    public Collection<String> tables() {
+        return tbls;
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest requestId(long reqId) {
+        this.reqId = reqId;
+
+        return this;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @param caches Caches.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest caches(List<Integer> caches) {
+        this.caches = caches;
+
+        return this;
+    }
+
+    /**
+     * @return Caches.
+     */
+    public List<Integer> caches() {
+        return caches;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest topologyVersion(AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
+
+        return this;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Explicit partitions mapping.
+     */
+    public Map<UUID,int[]> partitions() {
+        return parts;
+    }
+
+    /**
+     * @param parts Explicit partitions mapping.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest partitions(Map<UUID,int[]> parts) {
+        this.parts = parts;
+
+        return this;
+    }
+
+    /**
+     * @param pageSize Page size.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest pageSize(int pageSize) {
+        this.pageSize = pageSize;
+
+        return this;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @param qrys SQL Queries.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest queries(List<GridCacheSqlQuery> qrys) {
+        this.qrys = qrys;
+
+        return this;
+    }
+
+    /**
+     * @return SQL Queries.
+     */
+    public List<GridCacheSqlQuery> queries() {
+        return qrys;
+    }
+
+    /**
+     * @param flags Flags.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest flags(int flags) {
+        this.flags = (byte)flags;
+
+        return this;
+    }
+
+    /**
+     * @param flags Flags to check.
+     * @return {@code true} If all the requested flags are set to {@code true}.
+     */
+    public boolean isFlagSet(int flags) {
+        return (this.flags & flags) == flags;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void marshall(Marshaller m) {
+        if (F.isEmpty(qrys))
+            return;
+
+        for (GridCacheSqlQuery qry : qrys)
+            qry.marshall(m);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
+        if (F.isEmpty(qrys))
+            return;
+
+        for (GridCacheSqlQuery qry : qrys)
+            qry.unmarshall(m, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeCollection("caches", caches, MessageCollectionItemType.INT))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeInt("pageSize", pageSize))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                caches = reader.readCollection("caches", MessageCollectionItemType.INT);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                pageSize = reader.readInt("pageSize");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridH2QueryRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -33;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 8;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridH2QueryRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowMessage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowMessage.java
new file mode 100644
index 0000000..59c548d
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowMessage.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * SQL Row message.
+ */
+public class GridH2RowMessage implements Message {
+    /** */
+    @GridDirectCollection(Message.class)
+    @GridToStringInclude
+    private List<GridH2ValueMessage> vals;
+
+    /**
+     * @return Values of row.
+     */
+    public List<GridH2ValueMessage> values() {
+        return vals;
+    }
+
+    /**
+     * @param vals Values of row.
+     */
+    public void values(List<GridH2ValueMessage> vals) {
+        this.vals = vals;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridH2RowMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -32;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridH2RowMessage.class, this);
+    }
+}


Mime
View raw message