ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-5991: SQL: Lazy query execution. This closes #2437.
Date Thu, 17 Aug 2017 15:24:44 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 15710a869 -> 136075ae0


IGNITE-5991: SQL: Lazy query execution. This closes #2437.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/136075ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/136075ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/136075ae

Branch: refs/heads/master
Commit: 136075ae0f7070999dec6913afc8cef1a26eb307
Parents: 15710a8
Author: devozerov <vozerov@gridgain.com>
Authored: Thu Aug 17 18:24:34 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Thu Aug 17 18:24:34 2017 +0300

----------------------------------------------------------------------
 .../ignite/cache/query/SqlFieldsQuery.java      |  38 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  57 ++-
 .../query/h2/twostep/GridMapQueryExecutor.java  | 237 ++++++++---
 .../h2/twostep/GridReduceQueryExecutor.java     |   7 +-
 .../query/h2/twostep/MapNodeResults.java        |  19 +-
 .../query/h2/twostep/MapQueryLazyWorker.java    | 176 +++++++++
 .../query/h2/twostep/MapQueryLazyWorkerKey.java |  97 +++++
 .../query/h2/twostep/MapQueryResult.java        |  46 ++-
 .../query/h2/twostep/MapQueryResults.java       |  26 +-
 .../query/h2/twostep/MapRequestKey.java         |  23 +-
 .../h2/twostep/msg/GridH2QueryRequest.java      |   9 +
 .../processors/query/LazyQuerySelfTest.java     | 389 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 13 files changed, 1041 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 2838fe3..54f8396 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -71,6 +71,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
     /** */
     private boolean replicatedOnly;
 
+    /** */
+    private boolean lazy;
+
     /** Partitions for query */
     private int[] parts;
 
@@ -230,7 +233,7 @@ public class SqlFieldsQuery extends Query<List<?>> {
     /**
      * Check if distributed joins are enabled for this query.
      *
-     * @return {@code true} If distributed joind enabled.
+     * @return {@code true} If distributed joins enabled.
      */
     public boolean isDistributedJoins() {
         return distributedJoins;
@@ -269,6 +272,39 @@ public class SqlFieldsQuery extends Query<List<?>> {
     }
 
     /**
+     * Sets lazy query execution flag.
+     * <p>
+     * By default Ignite attempts to fetch the whole query result set to memory and send it to the client. For small
+     * and medium result sets this provides optimal performance and minimize duration of internal database locks, thus
+     * increasing concurrency.
+     * <p>
+     * If result set is too big to fit in available memory this could lead to excessive GC pauses and even
+     * OutOfMemoryError. Use this flag as a hint for Ignite to fetch result set lazily, thus minimizing memory
+     * consumption at the cost of moderate performance hit.
+     * <p>
+     * Defaults to {@code false}, meaning that the whole result set is fetched to memory eagerly.
+     *
+     * @param lazy Lazy query execution flag.
+     * @return {@code this} For chaining.
+     */
+    public SqlFieldsQuery setLazy(boolean lazy) {
+        this.lazy = lazy;
+
+        return this;
+    }
+
+    /**
+     * Gets lazy query execution flag.
+     * <p>
+     * See {@link #setLazy(boolean)} for more information.
+     *
+     * @return Lazy flag.
+     */
+    public boolean isLazy() {
+        return lazy;
+    }
+
+    /**
      * Gets partitions for query, in ascending order.
      */
     @Nullable public int[] getPartitions() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 007eeb1..6896f18 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -111,6 +111,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
+import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -139,6 +140,7 @@ import org.h2.api.ErrorCode;
 import org.h2.api.JavaObjectSerializer;
 import org.h2.command.Prepared;
 import org.h2.command.dml.Insert;
+import org.h2.engine.Session;
 import org.h2.engine.SysProperties;
 import org.h2.index.Index;
 import org.h2.jdbc.JdbcPreparedStatement;
@@ -905,24 +907,32 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed.
      */
     private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
-        int timeoutMillis, @Nullable GridQueryCancel cancel)
-        throws IgniteCheckedException {
+        int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
+        final MapQueryLazyWorker lazyWorker = MapQueryLazyWorker.currentWorker();
 
         if (cancel != null) {
             cancel.set(new Runnable() {
                 @Override public void run() {
-                    try {
-                        stmt.cancel();
-                    }
-                    catch (SQLException ignored) {
-                        // No-op.
+                    if (lazyWorker != null) {
+                        lazyWorker.submit(new Runnable() {
+                            @Override public void run() {
+                                cancelStatement(stmt);
+                            }
+                        });
                     }
+                    else
+                        cancelStatement(stmt);
                 }
             });
         }
 
+        Session ses = H2Utils.session(conn);
+
         if (timeoutMillis > 0)
-            H2Utils.session(conn).setQueryTimeout(timeoutMillis);
+            ses.setQueryTimeout(timeoutMillis);
+
+        if (lazyWorker != null)
+            ses.setLazyQueryExecution(true);
 
         try {
             return stmt.executeQuery();
@@ -936,7 +946,24 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
         finally {
             if (timeoutMillis > 0)
-                H2Utils.session(conn).setQueryTimeout(0);
+                ses.setQueryTimeout(0);
+
+            if (lazyWorker != null)
+                ses.setLazyQueryExecution(false);
+        }
+    }
+
+    /**
+     * Cancel prepared statement.
+     *
+     * @param stmt Statement.
+     */
+    private static void cancelStatement(PreparedStatement stmt) {
+        try {
+            stmt.cancel();
+        }
+        catch (SQLException ignored) {
+            // No-op.
         }
     }
 
@@ -1143,6 +1170,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param keepCacheObj Flag to keep cache object.
      * @param enforceJoinOrder Enforce join order of tables.
      * @param parts Partitions.
+     * @param lazy Lazy query execution flag.
      * @return Iterable result.
      */
     private Iterable<List<?>> runQueryTwoStep(
@@ -1153,12 +1181,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         final int timeoutMillis,
         final GridQueryCancel cancel,
         final Object[] params,
-        final int[] parts
+        final int[] parts,
+        final boolean lazy
     ) {
         return new Iterable<List<?>>() {
             @Override public Iterator<List<?>> iterator() {
                 return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params,
-                    parts);
+                    parts, lazy);
             }
         };
     }
@@ -1402,7 +1431,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
             runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(), cancel,
-                qry.getArgs(), partitions), cancel);
+                qry.getArgs(), partitions, qry.isLazy()), cancel);
 
         cursor.fieldsMeta(meta);
 
@@ -2070,6 +2099,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Stopping cache query index...");
 
+        mapQryExec.cancelLazyWorkers();
+
 //        unregisterMBean(); TODO https://issues.apache.org/jira/browse/IGNITE-2139
         if (ctx != null && !ctx.cache().context().database().persistenceEnabled()) {
             for (H2Schema schema : schemas.values())
@@ -2355,6 +2386,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public void cancelAllQueries() {
+        mapQryExec.cancelLazyWorkers();
+
         for (Connection conn : conns)
             U.close(conn, log);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ca978e2..0cc4172 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -28,7 +28,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -52,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservabl
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
-import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
@@ -69,6 +70,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.thread.IgniteThread;
 import org.h2.jdbc.JdbcResultSet;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
@@ -107,6 +109,15 @@ public class GridMapQueryExecutor {
     /** */
     private final ConcurrentMap<MapReservationKey, GridReservable> reservations = new ConcurrentHashMap8<>();
 
+    /** Lazy workers. */
+    private final ConcurrentHashMap<MapQueryLazyWorkerKey, MapQueryLazyWorker> lazyWorkers = new ConcurrentHashMap<>();
+
+    /** Busy lock for lazy workers. */
+    private final GridSpinBusyLock lazyWorkerBusyLock = new GridSpinBusyLock();
+
+    /** Lazy worker stop guard. */
+    private final AtomicBoolean lazyWorkerStopGuard = new AtomicBoolean();
+
     /**
      * @param busyLock Busy lock.
      */
@@ -162,6 +173,21 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * Cancel active lazy queries and prevent submit of new queries.
+     */
+    public void cancelLazyWorkers() {
+        if (!lazyWorkerStopGuard.compareAndSet(false, true))
+            return;
+
+        lazyWorkerBusyLock.block();
+
+        for (MapQueryLazyWorker worker : lazyWorkers.values())
+            worker.stop();
+
+        lazyWorkers.clear();
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param msg Message.
      */
@@ -221,7 +247,7 @@ public class GridMapQueryExecutor {
         MapNodeResults nodeRess = qryRess.get(nodeId);
 
         if (nodeRess == null) {
-            nodeRess = new MapNodeResults();
+            nodeRess = new MapNodeResults(nodeId);
 
             MapNodeResults old = qryRess.putIfAbsent(nodeId, nodeRess);
 
@@ -416,6 +442,7 @@ public class GridMapQueryExecutor {
         final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
         final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
         final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
+        final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
 
         final List<Integer> cacheIds = req.caches();
 
@@ -429,30 +456,51 @@ public class GridMapQueryExecutor {
 
             final int segment = i;
 
-            ctx.closure().callLocal(
-                new Callable<Void>() {
-                    @Override public Void call() throws Exception {
-                        onQueryRequest0(node,
-                            req.requestId(),
-                            segment,
-                            req.schemaName(),
-                            req.queries(),
-                            cacheIds,
-                            req.topologyVersion(),
-                            partsMap,
-                            parts,
-                            req.tables(),
-                            req.pageSize(),
-                            joinMode,
-                            enforceJoinOrder,
-                            false,
-                            req.timeout(),
-                            params);
-
-                        return null;
+            if (lazy) {
+                onQueryRequest0(node,
+                    req.requestId(),
+                    segment,
+                    req.schemaName(),
+                    req.queries(),
+                    cacheIds,
+                    req.topologyVersion(),
+                    partsMap,
+                    parts,
+                    req.pageSize(),
+                    joinMode,
+                    enforceJoinOrder,
+                    false, // Replicated is always false here (see condition above).
+                    req.timeout(),
+                    params,
+                    true); // Lazy = true.
+            }
+            else {
+                ctx.closure().callLocal(
+                    new Callable<Void>() {
+                        @Override
+                        public Void call() throws Exception {
+                            onQueryRequest0(node,
+                                req.requestId(),
+                                segment,
+                                req.schemaName(),
+                                req.queries(),
+                                cacheIds,
+                                req.topologyVersion(),
+                                partsMap,
+                                parts,
+                                req.pageSize(),
+                                joinMode,
+                                enforceJoinOrder,
+                                false,
+                                req.timeout(),
+                                params,
+                                false); // Lazy = false.
+
+                            return null;
+                        }
                     }
-                }
-                , QUERY_POOL);
+                    , QUERY_POOL);
+            }
         }
 
         onQueryRequest0(node,
@@ -464,13 +512,13 @@ public class GridMapQueryExecutor {
             req.topologyVersion(),
             partsMap,
             parts,
-            req.tables(),
             req.pageSize(),
             joinMode,
             enforceJoinOrder,
             replicated,
             req.timeout(),
-            params);
+            params,
+            lazy);
     }
 
     /**
@@ -483,28 +531,61 @@ public class GridMapQueryExecutor {
      * @param topVer Topology version.
      * @param partsMap Partitions map for unstable topology.
      * @param parts Explicit partitions for current node.
-     * @param tbls Tables.
      * @param pageSize Page size.
      * @param distributedJoinMode Query distributed join mode.
+     * @param lazy Streaming flag.
      */
     private void onQueryRequest0(
-        ClusterNode node,
-        long reqId,
-        int segmentId,
-        String schemaName,
-        Collection<GridCacheSqlQuery> qrys,
-        List<Integer> cacheIds,
-        AffinityTopologyVersion topVer,
-        Map<UUID, int[]> partsMap,
-        int[] parts,
-        Collection<QueryTable> tbls,
-        int pageSize,
-        DistributedJoinMode distributedJoinMode,
-        boolean enforceJoinOrder,
-        boolean replicated,
-        int timeout,
-        Object[] params
+        final ClusterNode node,
+        final long reqId,
+        final int segmentId,
+        final String schemaName,
+        final Collection<GridCacheSqlQuery> qrys,
+        final List<Integer> cacheIds,
+        final AffinityTopologyVersion topVer,
+        final Map<UUID, int[]> partsMap,
+        final int[] parts,
+        final int pageSize,
+        final DistributedJoinMode distributedJoinMode,
+        final boolean enforceJoinOrder,
+        final boolean replicated,
+        final int timeout,
+        final Object[] params,
+        boolean lazy
     ) {
+        if (lazy && MapQueryLazyWorker.currentWorker() == null) {
+            // Lazy queries must be re-submitted to dedicated workers.
+            MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
+            MapQueryLazyWorker worker = new MapQueryLazyWorker(ctx.igniteInstanceName(), key, log, this);
+
+            worker.submit(new Runnable() {
+                @Override public void run() {
+                    onQueryRequest0(node, reqId, segmentId, schemaName, qrys, cacheIds, topVer, partsMap, parts,
+                        pageSize, distributedJoinMode, enforceJoinOrder, replicated, timeout, params, true);
+                }
+            });
+
+            if (lazyWorkerBusyLock.enterBusy()) {
+                try {
+                    MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker);
+
+                    if (oldWorker != null)
+                        oldWorker.stop();
+
+                    IgniteThread thread = new IgniteThread(worker);
+
+                    thread.start();
+                }
+                finally {
+                    lazyWorkerBusyLock.leaveBusy();
+                }
+            }
+            else
+                log.info("Ignored query request (node is stopping) [nodeId=" + node.id() + ", reqId=" + reqId + ']');
+
+            return;
+        }
+
         // Prepare to run queries.
         GridCacheContext<?, ?> mainCctx =
             !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null;
@@ -519,13 +600,18 @@ public class GridMapQueryExecutor {
             if (topVer != null) {
                 // Reserve primary for topology version or explicit partitions.
                 if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
+                    // Unregister lazy worker because re-try may never reach this node again.
+                    if (lazy)
+                        stopAndUnregisterCurrentLazyWorker();
+
                     sendRetry(node, reqId, segmentId);
 
                     return;
                 }
             }
 
-            qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null);
+            qr = new MapQueryResults(h2, reqId, qrys.size(), mainCctx != null ? mainCctx.name() : null,
+                MapQueryLazyWorker.currentWorker());
 
             if (nodeRess.put(reqId, segmentId, qr) != null)
                 throw new IllegalStateException();
@@ -570,8 +656,7 @@ public class GridMapQueryExecutor {
                     ResultSet rs = null;
 
                     // If we are not the target node for this replicated query, just ignore it.
-                    if (qry.node() == null ||
-                        (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
+                    if (qry.node() == null || (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
                         rs = h2.executeSqlQueryWithTimer(conn, qry.query(),
                             F.asList(qry.parameters(params)), true,
                             timeout,
@@ -624,6 +709,10 @@ public class GridMapQueryExecutor {
                 qr.cancel(false);
             }
 
+            // Unregister worker after possible cancellation.
+            if (lazy)
+                stopAndUnregisterCurrentLazyWorker();
+
             if (X.hasCause(e, GridH2RetryException.class))
                 sendRetry(node, reqId, segmentId);
             else {
@@ -672,27 +761,39 @@ public class GridMapQueryExecutor {
      * @param node Node.
      * @param req Request.
      */
-    private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) {
-        MapNodeResults nodeRess = qryRess.get(node.id());
+    private void onNextPageRequest(final ClusterNode node, final GridQueryNextPageRequest req) {
+        final MapNodeResults nodeRess = qryRess.get(node.id());
 
         if (nodeRess == null) {
             sendError(node, req.queryRequestId(), new CacheException("No node result found for request: " + req));
 
             return;
-        } else if (nodeRess.cancelled(req.queryRequestId())) {
+        }
+        else if (nodeRess.cancelled(req.queryRequestId())) {
             sendError(node, req.queryRequestId(), new QueryCancelledException());
 
             return;
         }
 
-        MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
+        final MapQueryResults qr = nodeRess.get(req.queryRequestId(), req.segmentId());
 
         if (qr == null)
             sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
         else if (qr.cancelled())
             sendError(node, req.queryRequestId(), new QueryCancelledException());
-        else
-            sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
+        else {
+            MapQueryLazyWorker lazyWorker = qr.lazyWorker();
+
+            if (lazyWorker != null) {
+                lazyWorker.submit(new Runnable() {
+                    @Override public void run() {
+                        sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
+                    }
+                });
+            }
+            else
+                sendNextPage(nodeRess, node, qr, req.query(), req.segmentId(), req.pageSize());
+        }
     }
 
     /**
@@ -784,4 +885,34 @@ public class GridMapQueryExecutor {
                 reservations.remove(grpKey);
         }
     }
+
+    /**
+     * Unregister lazy worker if needed (i.e. if we are currently in lazy worker thread).
+     */
+    public void stopAndUnregisterCurrentLazyWorker() {
+        MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
+
+        if (worker != null) {
+            worker.stop();
+
+            // Just stop is not enough as worker may be registered, but not started due to exception.
+            unregisterLazyWorker(worker);
+        }
+    }
+
+    /**
+     * Unregister lazy worker.
+     *
+     * @param worker Worker.
+     */
+    public void unregisterLazyWorker(MapQueryLazyWorker worker) {
+        lazyWorkers.remove(worker.key(), worker);
+    }
+
+    /**
+     * @return Number of registered lazy workers.
+     */
+    public int registeredLazyWorkers() {
+        return lazyWorkers.size();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 0e9d1a2..8638794 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -506,6 +506,7 @@ public class GridReduceQueryExecutor {
      * @param cancel Query cancel.
      * @param params Query parameters.
      * @param parts Partitions.
+     * @param lazy Lazy execution flag.
      * @return Rows iterator.
      */
     public Iterator<List<?>> query(
@@ -516,7 +517,8 @@ public class GridReduceQueryExecutor {
         int timeoutMillis,
         GridQueryCancel cancel,
         Object[] params,
-        final int[] parts
+        final int[] parts,
+        boolean lazy
     ) {
         if (F.isEmpty(params))
             params = EMPTY_PARAMS;
@@ -712,6 +714,9 @@ public class GridReduceQueryExecutor {
                 if (isReplicatedOnly)
                     flags |= GridH2QueryRequest.FLAG_REPLICATED;
 
+                if (lazy && mapQrys.size() == 1)
+                    flags |= GridH2QueryRequest.FLAG_LAZY;
+
                 GridH2QueryRequest req = new GridH2QueryRequest()
                     .requestId(qryReqId)
                     .topologyVersion(topVer)

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
index d5ea357..2d20c8d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.jsr166.ConcurrentHashMap8;
 
+import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 
 import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
@@ -35,6 +36,18 @@ class MapNodeResults {
     private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
         new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
 
+    /** Node ID. */
+    private final UUID nodeId;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeId Node ID.
+     */
+    public MapNodeResults(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
     /**
      * @param reqId Query Request ID.
      * @return {@code False} if query was already cancelled.
@@ -59,7 +72,7 @@ class MapNodeResults {
      * @return query partial results.
      */
     public MapQueryResults get(long reqId, int segmentId) {
-        return res.get(new MapRequestKey(reqId, segmentId));
+        return res.get(new MapRequestKey(nodeId, reqId, segmentId));
     }
 
     /**
@@ -84,7 +97,7 @@ class MapNodeResults {
      * @return {@code True} if removed.
      */
     public boolean remove(long reqId, int segmentId, MapQueryResults qr) {
-        return res.remove(new MapRequestKey(reqId, segmentId), qr);
+        return res.remove(new MapRequestKey(nodeId, reqId, segmentId), qr);
     }
 
     /**
@@ -94,7 +107,7 @@ class MapNodeResults {
      * @return previous value.
      */
     public MapQueryResults put(long reqId, int segmentId, MapQueryResults qr) {
-        return res.put(new MapRequestKey(reqId, segmentId), qr);
+        return res.put(new MapRequestKey(nodeId, reqId, segmentId), qr);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
new file mode 100644
index 0000000..5158035
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorker.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.LongAdder8;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Worker for lazy query execution.
+ */
+public class MapQueryLazyWorker extends GridWorker {
+    /** Lazy thread flag. */
+    private static final ThreadLocal<MapQueryLazyWorker> LAZY_WORKER = new ThreadLocal<>();
+
+    /** Active lazy worker count (for testing purposes). */
+    private static final LongAdder8 ACTIVE_CNT = new LongAdder8();
+
+    /** Task to be executed. */
+    private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();
+
+    /** Key. */
+    private final MapQueryLazyWorkerKey key;
+
+    /** Map query executor. */
+    private final GridMapQueryExecutor exec;
+
+    /** Latch decremented when worker finishes. */
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+    /** Map query result. */
+    private volatile MapQueryResult res;
+
+    /**
+     * Constructor.
+     *
+     * @param instanceName Instance name.
+     * @param key Lazy worker key.
+     * @param log Logger.
+     * @param exec Map query executor.
+     */
+    public MapQueryLazyWorker(@Nullable String instanceName, MapQueryLazyWorkerKey key, IgniteLogger log,
+        GridMapQueryExecutor exec) {
+        super(instanceName, workerName(instanceName, key), log);
+
+        this.key = key;
+        this.exec = exec;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        LAZY_WORKER.set(this);
+
+        ACTIVE_CNT.increment();
+
+        try {
+            while (!isCancelled()) {
+                Runnable task = tasks.take();
+
+                if (task != null)
+                    task.run();
+            }
+        }
+        finally {
+            if (res != null)
+                res.close();
+
+            LAZY_WORKER.set(null);
+
+            ACTIVE_CNT.decrement();
+
+            exec.unregisterLazyWorker(this);
+        }
+    }
+
+    /**
+     * Submit task to worker.
+     *
+     * @param task Task to be executed.
+     */
+    public void submit(Runnable task) {
+        tasks.add(task);
+    }
+
+    /**
+     * @return Worker key.
+     */
+    public MapQueryLazyWorkerKey key() {
+        return key;
+    }
+
+    /**
+     * Stop the worker.
+     */
+    public void stop() {
+        if (MapQueryLazyWorker.currentWorker() == null)
+            submit(new Runnable() {
+                @Override public void run() {
+                    stop();
+                }
+            });
+        else {
+            isCancelled = true;
+
+            stopLatch.countDown();
+        }
+    }
+
+    /**
+     * Await worker stop.
+     */
+    public void awaitStop() {
+        try {
+            U.await(stopLatch);
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw new IgniteException("Failed to wait for lazy worker stop (interrupted): " + name(), e);
+        }
+    }
+
+    /**
+     * @param res Map query result.
+     */
+    public void result(MapQueryResult res) {
+        this.res = res;
+    }
+
+    /**
+     * @return Current worker or {@code null} if call is performed not from lazy worker thread.
+     */
+    @Nullable public static MapQueryLazyWorker currentWorker() {
+        return LAZY_WORKER.get();
+    }
+
+    /**
+     * @return Active workers count.
+     */
+    public static int activeCount() {
+        return ACTIVE_CNT.intValue();
+    }
+
+    /**
+     * Construct worker name.
+     *
+     * @param instanceName Instance name.
+     * @param key Key.
+     * @return Name.
+     */
+    private static String workerName(String instanceName, MapQueryLazyWorkerKey key) {
+        return "query-lazy-worker_" + instanceName + "_" + key.nodeId() + "_" + key.queryRequestId() + "_" +
+            key.segment();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java
new file mode 100644
index 0000000..a0f5ebb
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryLazyWorkerKey.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.UUID;
+
+/**
+ * Key to identify lazy worker.
+ */
+public class MapQueryLazyWorkerKey {
+    /** Client node ID. */
+    private final UUID nodeId;
+
+    /** Query request ID. */
+    private final long qryReqId;
+
+    /** Segment. */
+    private final int segment;
+
+    /**
+     * Constructor.
+     *
+     * @param nodeId Node ID.
+     * @param qryReqId Query request ID.
+     * @param segment Segment.
+     */
+    public MapQueryLazyWorkerKey(UUID nodeId, long qryReqId, int segment) {
+        this.nodeId = nodeId;
+        this.qryReqId = qryReqId;
+        this.segment = segment;
+    }
+
+    /**
+     * @return Node id.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /**
+     * @return Segment.
+     */
+    public int segment() {
+        return segment;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = nodeId.hashCode();
+
+        res = 31 * res + (int)(qryReqId ^ (qryReqId >>> 32));
+        res = 31 * res + segment;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (obj != null && obj instanceof MapQueryLazyWorkerKey) {
+            MapQueryLazyWorkerKey other = (MapQueryLazyWorkerKey)obj;
+
+            return F.eq(qryReqId, other.qryReqId) && F.eq(nodeId, other.nodeId) && F.eq(segment, other.segment);
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MapQueryLazyWorkerKey.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index 4799e03..e54c784d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.jdbc.JdbcResultSet;
+import org.h2.result.LazyResult;
 import org.h2.result.ResultInterface;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
@@ -41,7 +42,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 /**
  * Mapper result for a single part of the query.
  */
-class MapQueryResult implements AutoCloseable {
+class MapQueryResult {
     /** */
     private static final Field RESULT_FIELD;
 
@@ -95,24 +96,30 @@ class MapQueryResult implements AutoCloseable {
     /** */
     private final Object[] params;
 
+    /** Lazy worker. */
+    private final MapQueryLazyWorker lazyWorker;
+
     /**
      * @param rs Result set.
      * @param cacheName Cache name.
      * @param qrySrcNodeId Query source node.
      * @param qry Query.
      * @param params Query params.
+     * @param lazyWorker Lazy worker.
      */
     MapQueryResult(IgniteH2Indexing h2, ResultSet rs, @Nullable String cacheName,
-        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params) {
+        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, @Nullable MapQueryLazyWorker lazyWorker) {
         this.h2 = h2;
         this.cacheName = cacheName;
         this.qry = qry;
         this.params = params;
         this.qrySrcNodeId = qrySrcNodeId;
         this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
+        this.lazyWorker = lazyWorker;
 
         if (rs != null) {
             this.rs = rs;
+
             try {
                 res = (ResultInterface)RESULT_FIELD.get(rs);
             }
@@ -120,7 +127,7 @@ class MapQueryResult implements AutoCloseable {
                 throw new IllegalStateException(e); // Must not happen.
             }
 
-            rowCnt = res.getRowCount();
+            rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
             cols = res.getVisibleColumnCount();
         }
         else {
@@ -167,6 +174,8 @@ class MapQueryResult implements AutoCloseable {
      * @return {@code true} If there are no more rows available.
      */
     synchronized boolean fetchNextPage(List<Value[]> rows, int pageSize) {
+        assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
+
         if (closed)
             return true;
 
@@ -246,13 +255,34 @@ class MapQueryResult implements AutoCloseable {
         return res;
     }
 
-    /** {@inheritDoc} */
-    @Override public synchronized void close() {
-        if (closed)
+    /**
+     * Close the result.
+     */
+    public void close() {
+        if (lazyWorker != null && MapQueryLazyWorker.currentWorker() == null) {
+            lazyWorker.submit(new Runnable() {
+                @Override public void run() {
+                    close();
+                }
+            });
+
+            lazyWorker.awaitStop();
+
             return;
+        }
 
-        closed = true;
+        synchronized (this) {
+            assert lazyWorker == null || lazyWorker == MapQueryLazyWorker.currentWorker();
 
-        U.closeQuiet(rs);
+            if (closed)
+                return;
+
+            closed = true;
+
+            U.closeQuiet(rs);
+
+            if (lazyWorker != null)
+                lazyWorker.stop();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index 7ad1d14..99f1966 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -45,20 +45,27 @@ class MapQueryResults {
     /** */
     private final String cacheName;
 
+    /** Lazy worker. */
+    private final MapQueryLazyWorker lazyWorker;
+
     /** */
     private volatile boolean cancelled;
 
     /**
+     * Constructor.
+     *
      * @param qryReqId Query request ID.
      * @param qrys Number of queries.
      * @param cacheName Cache name.
+     * @param lazyWorker Lazy worker (if any).
      */
     @SuppressWarnings("unchecked")
-    MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys,
-        @Nullable String cacheName) {
+    MapQueryResults(IgniteH2Indexing h2, long qryReqId, int qrys, @Nullable String cacheName,
+        @Nullable MapQueryLazyWorker lazyWorker) {
         this.h2 = h2;
         this.qryReqId = qryReqId;
         this.cacheName = cacheName;
+        this.lazyWorker = lazyWorker;
 
         results = new AtomicReferenceArray<>(qrys);
         cancels = new GridQueryCancel[qrys];
@@ -86,13 +93,25 @@ class MapQueryResults {
     }
 
     /**
+     * @return Lazy worker.
+     */
+    MapQueryLazyWorker lazyWorker() {
+        return lazyWorker;
+    }
+
+    /**
+     * Add result.
+     *
      * @param qry Query result index.
      * @param q Query object.
      * @param qrySrcNodeId Query source node.
      * @param rs Result set.
      */
     void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
-        MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params);
+        MapQueryResult res = new MapQueryResult(h2, rs, cacheName, qrySrcNodeId, q, params, lazyWorker);
+
+        if (lazyWorker != null)
+            lazyWorker.result(res);
 
         if (!results.compareAndSet(qry, null, res))
             throw new IllegalStateException();
@@ -130,6 +149,7 @@ class MapQueryResults {
                 continue;
             }
 
+            // NB: Cancel is already safe even for lazy queries (see implementation of passed Runnable).
             if (forceQryCancel) {
                 GridQueryCancel cancel = cancels[i];
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
index 6feb8ea..9d987db 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapRequestKey.java
@@ -17,18 +17,32 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
+import org.apache.ignite.internal.util.typedef.F;
+
+import java.util.UUID;
+
 /**
  * Mapper request key.
  */
 class MapRequestKey {
+    /** Node ID. */
+    private UUID nodeId;
+
     /** */
     private long reqId;
 
     /** */
     private int segmentId;
 
-    /** Constructor */
-    MapRequestKey(long reqId, int segmentId) {
+    /**
+     * Constructor.
+     *
+     * @param nodeId Node ID.
+     * @param reqId Request ID.
+     * @param segmentId Segment ID.
+     */
+    MapRequestKey(UUID nodeId, long reqId, int segmentId) {
+        this.nodeId = nodeId;
         this.reqId = reqId;
         this.segmentId = segmentId;
     }
@@ -50,14 +64,15 @@ class MapRequestKey {
 
         MapRequestKey other = (MapRequestKey)o;
 
-        return reqId == other.reqId && segmentId == other.segmentId;
+        return F.eq(nodeId, other.nodeId) && reqId == other.reqId && segmentId == other.segmentId;
 
     }
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        int res = (int)(reqId ^ (reqId >>> 32));
+        int res = nodeId != null ? nodeId.hashCode() : 0;
 
+        res = 31 * res + (int)(reqId ^ (reqId >>> 32));
         res = 31 * res + segmentId;
 
         return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 93a383c..4e1fadb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -78,6 +78,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
      */
     public static final int FLAG_REPLICATED = 1 << 4;
 
+    /**
+     * If lazy execution is enabled.
+     */
+    public static final int FLAG_LAZY = 1 << 5;
+
     /** */
     private long reqId;
 
@@ -185,6 +190,10 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     }
 
     /**
+     * Get tables.
+     * <p>
+     * N.B.: Was used in AI 1.9 for snapshots. Unused at the moment, but should be kept for compatibility reasons.
+     *
      * @return Tables.
      */
     public Collection<QueryTable> tables() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
new file mode 100644
index 0000000..d5cc0eb
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LazyQuerySelfTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.twostep.MapQueryLazyWorker;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Tests for lazy query execution.
+ */
+public class LazyQuerySelfTest extends GridCommonAbstractTest {
+    /** Keys ocunt. */
+    private static final int KEY_CNT = 200;
+
+    /** Base query argument. */
+    private static final int BASE_QRY_ARG = 50;
+
+    /** Size for small pages. */
+    private static final int PAGE_SIZE_SMALL = 12;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Test local query execution.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSingleNode() throws Exception {
+        checkSingleNode(1);
+    }
+
+    /**
+     * Test local query execution.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSingleNodeWithParallelism() throws Exception {
+        checkSingleNode(4);
+    }
+
+    /**
+     * Test query execution with multiple topology nodes.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMultipleNodes() throws Exception {
+        checkMultipleNodes(1);
+    }
+
+    /**
+     * Test query execution with multiple topology nodes with query parallelism.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMultipleNodesWithParallelism() throws Exception {
+        checkMultipleNodes(4);
+    }
+
+    /**
+     * Check local query execution.
+     *
+     * @param parallelism Query parallelism.
+     * @throws Exception If failed.
+     */
+    public void checkSingleNode(int parallelism) throws Exception {
+        Ignite srv = startGrid();
+
+        srv.createCache(cacheConfiguration(parallelism));
+
+        populateBaseQueryData(srv);
+
+        checkBaseOperations(srv);
+    }
+
+    /**
+     * Check query execution with multiple topology nodes.
+     *
+     * @param parallelism Query parallelism.
+     * @throws Exception If failed.
+     */
+    public void checkMultipleNodes(int parallelism) throws Exception {
+        Ignite srv1 = startGrid(1);
+        Ignite srv2 = startGrid(2);
+
+        Ignite cli;
+
+        try {
+            Ignition.setClientMode(true);
+
+            cli = startGrid(3);
+        }
+        finally {
+            Ignition.setClientMode(false);
+        }
+
+        cli.createCache(cacheConfiguration(parallelism));
+
+        populateBaseQueryData(cli);
+
+        checkBaseOperations(srv1);
+        checkBaseOperations(srv2);
+        checkBaseOperations(cli);
+
+        // Test originating node leave.
+        FieldsQueryCursor<List<?>> cursor = execute(cli, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+        Iterator<List<?>> iter = cursor.iterator();
+
+        for (int i = 0; i < 30; i++)
+            iter.next();
+
+        stopGrid(3);
+
+        assertNoWorkers();
+
+        // Test server node leave with active worker.
+        cursor = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+        try {
+            iter = cursor.iterator();
+
+            for (int i = 0; i < 30; i++)
+                iter.next();
+
+            stopGrid(2);
+        }
+        finally {
+            cursor.close();
+        }
+
+        assertNoWorkers();
+    }
+
+    /**
+     * Check base operations.
+     *
+     * @param node Node.
+     * @throws Exception If failed.
+     */
+    private void checkBaseOperations(Ignite node) throws Exception {
+        // Get full data.
+        List<List<?>> rows = execute(node, baseQuery()).getAll();
+
+        assertBaseQueryResults(rows);
+        assertNoWorkers();
+
+        // Get data in several pages.
+        rows = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)).getAll();
+
+        assertBaseQueryResults(rows);
+        assertNoWorkers();
+
+        // Test full iteration.
+        rows = new ArrayList<>();
+
+        FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL));
+
+        for (List<?> row : cursor)
+            rows.add(row);
+
+        assertBaseQueryResults(rows);
+        assertNoWorkers();
+
+        // Test partial iteration with cursor close.
+        try (FieldsQueryCursor<List<?>> partialCursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL))) {
+            Iterator<List<?>> iter = partialCursor.iterator();
+
+            for (int i = 0; i < 30; i++)
+                iter.next();
+        }
+
+        assertNoWorkers();
+
+        // Test execution of multiple queries at a time.
+        List<Iterator<List<?>>> iters = new ArrayList<>();
+
+        for (int i = 0; i < 200; i++)
+            iters.add(execute(node, randomizedQuery().setPageSize(PAGE_SIZE_SMALL)).iterator());
+
+        while (!iters.isEmpty()) {
+            Iterator<Iterator<List<?>>> iterIter = iters.iterator();
+
+            while (iterIter.hasNext()) {
+                Iterator<List<?>> iter = iterIter.next();
+
+                int i = 0;
+
+                while (iter.hasNext() && i < 20) {
+                    iter.next();
+
+                    i++;
+                }
+
+                if (!iter.hasNext())
+                    iterIter.remove();
+            }
+        }
+
+        assertNoWorkers();
+    }
+
+    /**
+     * Populate base query data.
+     *
+     * @param node Node.
+     */
+    private static void populateBaseQueryData(Ignite node) {
+        IgniteCache<Long, Person> cache = cache(node);
+
+        for (long i = 0; i < KEY_CNT; i++)
+            cache.put(i, new Person(i));
+    }
+
+    /**
+     * @return Query with randomized argument.
+     */
+    private static SqlFieldsQuery randomizedQuery() {
+        return query(ThreadLocalRandom.current().nextInt(KEY_CNT / 2));
+    }
+
+    /**
+     * @return Base query.
+     */
+    private static SqlFieldsQuery baseQuery() {
+        return query(BASE_QRY_ARG);
+    }
+
+    /**
+     * @param parallelism Query parallelism.
+     * @return Default cache configuration.
+     */
+    private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism) {
+        return new CacheConfiguration<Long, Person>().setName(CACHE_NAME).setIndexedTypes(Long.class, Person.class)
+            .setQueryParallelism(parallelism);
+    }
+
+    /**
+     * Default query.
+     *
+     * @param arg Argument.
+     * @return Query.
+     */
+    private static SqlFieldsQuery query(long arg) {
+        return new SqlFieldsQuery("SELECT id, name FROM Person WHERE id >= ?").setArgs(arg);
+    }
+
+    /**
+     * Assert base query results.
+     *
+     * @param rows Result rows.
+     */
+    private static void assertBaseQueryResults(List<List<?>> rows) {
+        assertEquals(KEY_CNT - BASE_QRY_ARG, rows.size());
+
+        for (List<?> row : rows) {
+            Long id = (Long)row.get(0);
+            String name = (String)row.get(1);
+
+            assertTrue(id >= BASE_QRY_ARG);
+            assertEquals(nameForId(id), name);
+        }
+    }
+
+    /**
+     * Get cache for node.
+     *
+     * @param node Node.
+     * @return Cache.
+     */
+    private static IgniteCache<Long, Person> cache(Ignite node) {
+        return node.cache(CACHE_NAME);
+    }
+
+    /**
+     * Execute query on the given cache.
+     *
+     * @param node Node.
+     * @param qry Query.
+     * @return Cursor.
+     */
+    @SuppressWarnings("unchecked")
+    private static FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) {
+        return cache(node).query(qry.setLazy(true));
+    }
+
+    /**
+     * Make sure that are no active lazy workers.
+     *
+     * @throws Exception If failed.
+     */
+    private static void assertNoWorkers() throws Exception {
+        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (Ignite node : Ignition.allGrids()) {
+                    IgniteH2Indexing idx = (IgniteH2Indexing) ((IgniteKernal)node).context().query().getIndexing();
+
+                    if (idx.mapQueryExecutor().registeredLazyWorkers() != 0)
+                        return false;
+                }
+
+                return MapQueryLazyWorker.activeCount() == 0;
+            }
+        }, 1000L);
+    }
+
+    /**
+     * Get name for ID.
+     *
+     * @param id ID.
+     * @return Name.
+     */
+    private static String nameForId(long id) {
+        return "name-" + id;
+    }
+
+    /**
+     * Person class.
+     */
+    private static class Person {
+        /** ID. */
+        @QuerySqlField(index = true)
+        private long id;
+
+        /** Name. */
+        @QuerySqlField
+        private String name;
+
+        /**
+         * Constructor.
+         *
+         * @param id ID.
+         */
+        public Person(long id) {
+            this.id = id;
+            this.name = nameForId(id);
+        }
+
+        /**
+         * @return ID.
+         */
+        public long id() {
+            return id;
+        }
+
+        /**
+         * @return Name.
+         */
+        public String name() {
+            return name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/136075ae/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 99b0370..5ac0655f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -127,6 +127,7 @@ import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexMultiNodeSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
+import org.apache.ignite.internal.processors.query.LazyQuerySelfTest;
 import org.apache.ignite.internal.processors.query.SqlSchemaSelfTest;
 import org.apache.ignite.internal.processors.query.h2.GridH2IndexingInMemSelfTest;
 import org.apache.ignite.internal.processors.query.h2.GridH2IndexingOffheapSelfTest;
@@ -184,6 +185,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IncorrectQueryEntityTest.class);
 
         // Queries tests.
+        suite.addTestSuite(LazyQuerySelfTest.class);
         suite.addTestSuite(IgniteSqlSplitterSelfTest.class);
         suite.addTestSuite(IgniteSqlSegmentedIndexSelfTest.class);
         suite.addTestSuite(IgniteSqlSegmentedIndexMultiNodeSelfTest.class);


Mime
View raw message