ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [32/43] ignite git commit: ignite-2921: ScanQueries over local partitions performance optimisation
Date Thu, 19 May 2016 09:38:05 GMT
ignite-2921: ScanQueries over local partitions performance optimisation


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bcb3e104
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bcb3e104
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bcb3e104

Branch: refs/heads/ignite-3163
Commit: bcb3e104bfc8c8fbc882a475feecf9efef4e17d8
Parents: 40fc2ec
Author: ashutak <ashutak@gridgain.com>
Authored: Mon May 16 16:27:20 2016 +0300
Committer: ashutak <ashutak@gridgain.com>
Committed: Mon May 16 16:27:20 2016 +0300

----------------------------------------------------------------------
 .../cache/CacheWeakQueryIteratorsHolder.java    | 169 ++++++-
 .../processors/cache/GridCacheAdapter.java      |  10 +-
 .../processors/cache/GridCacheSwapManager.java  |  12 +-
 .../processors/cache/IgniteCacheProxy.java      |  36 +-
 .../binary/CacheObjectBinaryProcessorImpl.java  |  16 +-
 .../processors/cache/query/CacheQuery.java      |  10 +-
 .../cache/query/CacheQueryFuture.java           |  13 +-
 .../query/GridCacheDistributedQueryManager.java | 100 +++-
 .../cache/query/GridCacheLocalQueryManager.java |  34 +-
 .../cache/query/GridCacheQueryAdapter.java      | 175 +++++--
 .../cache/query/GridCacheQueryErrorFuture.java  |  12 +-
 .../query/GridCacheQueryFutureAdapter.java      |  10 +-
 .../cache/query/GridCacheQueryManager.java      | 502 ++++++++++++++-----
 .../datastructures/GridCacheSetImpl.java        |   4 +-
 .../service/GridServiceProcessor.java           |  45 +-
 ...achePartitionedPreloadLifecycleSelfTest.java | 102 +---
 ...CacheReplicatedPreloadLifecycleSelfTest.java | 132 +----
 .../CacheAbstractQueryMetricsSelfTest.java      |   4 +-
 .../cache/IgniteCacheAbstractQuerySelfTest.java |  34 +-
 19 files changed, 860 insertions(+), 560 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java
index 4c48e74..2e03b53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -34,11 +35,10 @@ import org.jsr166.ConcurrentHashMap8;
  */
 public class CacheWeakQueryIteratorsHolder<V> {
     /** Iterators weak references queue. */
-    private final ReferenceQueue<WeakQueryFutureIterator> refQueue = new ReferenceQueue<>();
+    private final ReferenceQueue refQueue = new ReferenceQueue();
 
     /** Iterators futures. */
-    private final Map<WeakReference<WeakQueryFutureIterator>,CacheQueryFuture<V>> futs =
-        new ConcurrentHashMap8<>();
+    private final Map<WeakReference, AutoCloseable> refs = new ConcurrentHashMap8<>();
 
     /** Logger. */
     private final IgniteLogger log;
@@ -56,10 +56,27 @@ public class CacheWeakQueryIteratorsHolder<V> {
      * @param <T> Type for the iterator.
      * @return Iterator over the cache.
      */
-    public <T> WeakQueryFutureIterator iterator(CacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) {
+    public <T> WeakReferenceCloseableIterator<T> iterator(final CacheQueryFuture<V> fut,
+        CacheIteratorConverter<T, V> convert) {
         WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert);
 
-        CacheQueryFuture<V> old = futs.put(it.weakReference(), fut);
+        AutoCloseable old = refs.put(it.weakReference(), fut);
+
+        assert old == null;
+
+        return it;
+    }
+
+    /**
+     * @param iter Closeable iterator.
+     * @param <T> Type for the iterator.
+     * @return Iterator over the cache.
+     */
+    public <T> WeakReferenceCloseableIterator<T> iterator(final GridCloseableIterator<V> iter,
+        CacheIteratorConverter<T, V> convert) {
+        WeakQueryCloseableIterator it = new WeakQueryCloseableIterator(iter, convert);
+
+        AutoCloseable old = refs.put(it.weakReference(), iter);
 
         assert old == null;
 
@@ -71,8 +88,8 @@ public class CacheWeakQueryIteratorsHolder<V> {
      *
      * @throws IgniteCheckedException If failed.
      */
-    public void removeIterator(WeakQueryFutureIterator it) throws IgniteCheckedException {
-        futs.remove(it.weakReference());
+    public void removeIterator(WeakReferenceCloseableIterator it) throws IgniteCheckedException {
+        refs.remove(it.weakReference());
 
         it.close();
     }
@@ -81,17 +98,17 @@ public class CacheWeakQueryIteratorsHolder<V> {
      * Closes unreachable iterators.
      */
     public void checkWeakQueue() {
-        for (Reference<? extends WeakQueryFutureIterator> itRef = refQueue.poll(); itRef != null;
+        for (Reference itRef = refQueue.poll(); itRef != null;
             itRef = refQueue.poll()) {
             try {
-                WeakReference<WeakQueryFutureIterator> weakRef = (WeakReference<WeakQueryFutureIterator>)itRef;
+                WeakReference weakRef = (WeakReference)itRef;
 
-                CacheQueryFuture<?> fut = futs.remove(weakRef);
+                AutoCloseable rsrc = refs.remove(weakRef);
 
-                if (fut != null)
-                    fut.cancel();
+                if (rsrc != null)
+                    rsrc.close();
             }
-            catch (IgniteCheckedException e) {
+            catch (Exception e) {
                 U.error(log, "Failed to close iterator.", e);
             }
         }
@@ -101,16 +118,16 @@ public class CacheWeakQueryIteratorsHolder<V> {
      * Cancel all cache queries.
      */
     public void clearQueries(){
-        for (CacheQueryFuture<?> fut : futs.values()) {
+        for (AutoCloseable rsrc : refs.values()) {
             try {
-                fut.cancel();
+                rsrc.close();
             }
-            catch (IgniteCheckedException e) {
+            catch (Exception e) {
                 U.error(log, "Failed to close iterator.", e);
             }
         }
 
-        futs.clear();
+        refs.clear();
     }
 
 
@@ -119,7 +136,8 @@ public class CacheWeakQueryIteratorsHolder<V> {
      *
      * @param <T> Type for iterator.
      */
-    public class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T> {
+    private class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T>
+        implements WeakReferenceCloseableIterator<T> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -204,10 +222,8 @@ public class CacheWeakQueryIteratorsHolder<V> {
             cur = null;
         }
 
-        /**
-         * @return Iterator weak reference.
-         */
-        private WeakReference<WeakQueryFutureIterator<T>> weakReference() {
+        /** {@inheritDoc} */
+        @Override public WeakReference<WeakQueryFutureIterator<T>> weakReference() {
             return weakRef;
         }
 
@@ -217,7 +233,7 @@ public class CacheWeakQueryIteratorsHolder<V> {
         private void clearWeakReference() {
             weakRef.clear();
 
-            futs.remove(weakRef);
+            refs.remove(weakRef);
         }
 
         /**
@@ -233,4 +249,109 @@ public class CacheWeakQueryIteratorsHolder<V> {
             }
         }
     }
-}
\ No newline at end of file
+
+    /**
+     * @param <T> Type.
+     */
+    public class WeakQueryCloseableIterator<T> extends GridCloseableIteratorAdapter<T>
+        implements WeakReferenceCloseableIterator<T> {
+        /** */
+        private static final long serialVersionUID = 0;
+
+        /** */
+        private final GridCloseableIterator<V> iter;
+
+        /** */
+        private final CacheIteratorConverter<T, V> convert;
+
+        /** */
+        private final WeakReference weakRef;
+
+        /** */
+        private T cur;
+
+        /**
+         * @param iter Iterator.
+         * @param convert Converter.
+         */
+        WeakQueryCloseableIterator(GridCloseableIterator<V> iter, CacheIteratorConverter<T, V> convert) {
+            this.iter = iter;
+            this.convert = convert;
+
+            weakRef = new WeakReference(this, refQueue);
+       }
+
+
+        /** {@inheritDoc} */
+        @Override protected T onNext() throws IgniteCheckedException {
+            V next;
+
+            try {
+                next = iter.nextX();
+            }
+            catch (NoSuchElementException e){
+                clearWeakReference();
+
+                throw e;
+            }
+
+            if (next == null)
+                clearWeakReference();
+
+            cur = next != null ? convert.convert(next) : null;
+
+            return cur;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean onHasNext() throws IgniteCheckedException {
+            boolean hasNextX = iter.hasNextX();
+
+            if (!hasNextX)
+                clearWeakReference();
+
+            return hasNextX;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onRemove() throws IgniteCheckedException {
+            if (cur == null)
+                throw new IllegalStateException();
+
+            convert.remove(cur);
+
+            cur = null;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onClose() throws IgniteCheckedException {
+            iter.close();
+
+            clearWeakReference();
+        }
+
+        /**
+         * Clears weak reference.
+         */
+        private void clearWeakReference() {
+            weakRef.clear();
+
+            refs.remove(weakRef);
+        }
+
+        /** {@inheritDoc} */
+        @Override public WeakReference weakReference() {
+            return weakRef;
+        }
+    }
+
+    /**
+     *
+     */
+    public static interface WeakReferenceCloseableIterator<T> extends GridCloseableIterator<T> {
+        /**
+         * @return Iterator weak reference.
+         */
+        public WeakReference weakReference();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index fbba82e..dd06ef8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -86,7 +86,6 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
@@ -107,6 +106,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridTriple;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -4088,7 +4088,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /**
      * @return Distributed ignite cache iterator.
      */
-    public Iterator<Cache.Entry<K, V>> igniteIterator() {
+    public Iterator<Cache.Entry<K, V>> igniteIterator() throws IgniteCheckedException {
         GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx;
 
         final CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -4096,11 +4096,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1)
             return localIteratorHonorExpirePolicy(opCtx);
 
-        CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepBinary())
+        final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, ctx.keepBinary())
             .keepAll(false)
-            .execute();
+            .executeScanQuery();
 
-        return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
+        return ctx.itHolder().iterator(iter, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
             @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
                 return new CacheEntryImpl<>(e.getKey(), e.getValue());
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index d50bf0b..127f1be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1848,7 +1848,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Off-heap iterator.
      */
     public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
-        Integer part,
+        @Nullable Integer part,
         boolean primary,
         boolean backup)
     {
@@ -1859,8 +1859,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         checkIteratorQueue();
 
-        if (primary && backup)
-            return offheap.iterator(spaceName, c);
+        if (primary && backup) {
+            if (part == null)
+                return offheap.iterator(spaceName, c);
+            else
+                return offheap.iterator(spaceName, c, part);
+        }
 
         AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion();
 
@@ -1894,7 +1898,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (!offheapEnabled || (!primary && !backup))
             return new GridEmptyCloseableIterator<>();
 
-        if (primary && backup)
+        if (primary && backup && part == null)
             return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
                 private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = offheap.iterator(spaceName);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 9b7ac4c..12ec8b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridEmptyIterator;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -455,7 +456,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     private QueryCursor<Cache.Entry<K,V>> query(final Query filter, @Nullable ClusterGroup grp)
         throws IgniteCheckedException {
         final CacheQuery<Map.Entry<K,V>> qry;
-        final CacheQueryFuture<Map.Entry<K,V>> fut;
 
         boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary();
 
@@ -467,14 +467,35 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
             if (grp != null)
                 qry.projection(grp);
 
-            fut = ctx.kernalContext().query().executeQuery(ctx,
-                new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
-                    @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
-                        return qry.execute();
+            final GridCloseableIterator<Entry<K, V>> iter = ctx.kernalContext().query().executeQuery(ctx,
+                new IgniteOutClosureX<GridCloseableIterator<Entry<K,V>>>() {
+                    @Override public GridCloseableIterator<Entry<K,V>> applyx() throws IgniteCheckedException {
+                        final GridCloseableIterator<Map.Entry> iter0 = qry.executeScanQuery();
+
+                        return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
+                            @Override protected Cache.Entry<K, V> onNext() throws IgniteCheckedException {
+                                Map.Entry<K, V> next = iter0.nextX();
+
+                                return new CacheEntryImpl<>(next.getKey(), next.getValue());
+                            }
+
+                            @Override protected boolean onHasNext() throws IgniteCheckedException {
+                                return iter0.hasNextX();
+                            }
+
+                            @Override protected void onClose() throws IgniteCheckedException {
+                                iter0.close();
+                            }
+                        };
                     }
                 }, false);
+
+            return new QueryCursorImpl<>(iter);
         }
-        else if (filter instanceof TextQuery) {
+
+        final CacheQueryFuture<Map.Entry<K,V>> fut;
+
+        if (filter instanceof TextQuery) {
             TextQuery p = (TextQuery)filter;
 
             qry = ctx.queries().createFullTextQuery(p.getType(), p.getText(), isKeepBinary);
@@ -1797,6 +1818,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         try {
             return ctx.cache().igniteIterator();
         }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
+        }
         finally {
             onLeave(gate, prev);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index e130e19..fe73f8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -79,11 +79,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridMapEntry;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -299,16 +299,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
                 qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
 
-                try {
-                    CacheQueryFuture<Map.Entry<BinaryMetadataKey, BinaryMetadata>> fut = qry.execute();
-
-                    Map.Entry<BinaryMetadataKey, BinaryMetadata> next;
-
-                    while ((next = fut.next()) != null) {
-                        assert next.getKey() != null : next;
-                        assert next.getValue() != null : next;
+                try (GridCloseableIterator<Map.Entry<BinaryMetadataKey, BinaryMetadata>> entries = qry.executeScanQuery()) {
+                    for (Map.Entry<BinaryMetadataKey, BinaryMetadata> e : entries) {
+                        assert e.getKey() != null : e;
+                        assert e.getValue() != null : e;
 
-                        addClientCacheMetaData(next.getKey(), next.getValue());
+                        addClientCacheMetaData(e.getKey(), e.getValue());
                     }
                 }
                 catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 5f9dc61..47c6e89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.affinity.AffinityKey;
 import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryMetrics;
@@ -24,6 +26,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.cache.query.annotations.QueryTextField;
 import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteReducer;
 import org.jetbrains.annotations.Nullable;
@@ -289,4 +292,9 @@ public interface CacheQuery<T> {
      * Resets metrics for this query.
      */
     public void resetMetrics();
-}
\ No newline at end of file
+
+    /**
+     * @return Scan query iterator.
+     */
+    public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java
index bb342b3..a0244d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java
@@ -26,16 +26,7 @@ import org.jetbrains.annotations.Nullable;
  * Cache query future returned by query execution.
  * Refer to {@link CacheQuery} documentation for more information.
  */
-public interface CacheQueryFuture<T> extends IgniteInternalFuture<Collection<T>> {
-    /**
-     * Returns number of elements that are already fetched and can
-     * be returned from {@link #next()} method without blocking.
-     *
-     * @return Number of fetched elements which are available immediately.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public int available() throws IgniteCheckedException;
-
+public interface CacheQueryFuture<T> extends IgniteInternalFuture<Collection<T>>, AutoCloseable {
     /**
      * Returns next element from result set.
      * <p>
@@ -62,4 +53,4 @@ public interface CacheQueryFuture<T> extends IgniteInternalFuture<Collection<T>>
      * @throws IgniteCheckedException {@inheritDoc}
      */
     @Override public boolean cancel() throws IgniteCheckedException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 353fbd3..5f6cb8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
@@ -35,6 +36,8 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
@@ -55,7 +58,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 
 /**
- * Distributed query manager.
+ * Distributed query manager (for cache in REPLICATED / PARTITIONED cache mode).
  */
 public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManager<K, V> {
     /** */
@@ -512,29 +515,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
     }
 
     /** {@inheritDoc} */
-    @Override public CacheQueryFuture<?> queryLocal(GridCacheQueryBean qry) {
-        assert cctx.config().getCacheMode() != LOCAL;
-
-        if (log.isDebugEnabled())
-            log.debug("Executing query on local node: " + qry);
-
-        GridCacheLocalQueryFuture<K, V, ?> fut = new GridCacheLocalQueryFuture<>(cctx, qry);
-
-        try {
-            qry.query().validate();
-
-            fut.execute();
-        }
-        catch (IgniteCheckedException e) {
-            fut.onDone(e);
-        }
-
-        return fut;
-    }
-
-    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
+    @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, final Collection<ClusterNode> nodes) {
         assert cctx.config().getCacheMode() != LOCAL;
 
         if (log.isDebugEnabled())
@@ -550,7 +532,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
 
             String clsName = qry.query().queryClassName();
 
-            GridCacheQueryRequest req = new GridCacheQueryRequest(
+            final GridCacheQueryRequest req = new GridCacheQueryRequest(
                 cctx.cacheId(),
                 reqId,
                 cctx.name(),
@@ -595,6 +577,76 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings({"unchecked", "serial"})
+    @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(final GridCacheQueryAdapter qry,
+        Collection<ClusterNode> nodes) throws IgniteCheckedException {
+        assert !cctx.isLocal() : cctx.name();
+        assert qry.type() == GridCacheQueryType.SCAN: qry;
+
+        GridCloseableIterator<Map.Entry<K, V>> locIter0 = null;
+
+        for (ClusterNode node : nodes) {
+            if (node.isLocal()) {
+                locIter0 = (GridCloseableIterator)scanQueryLocal(qry, false);
+
+                Collection<ClusterNode> rmtNodes = new ArrayList<>(nodes.size() - 1);
+
+                for (ClusterNode n : nodes) {
+                    // Equals by reference can be used here.
+                    if (n != node)
+                        rmtNodes.add(n);
+                }
+
+                nodes = rmtNodes;
+
+                break;
+            }
+        }
+
+        final GridCloseableIterator<Map.Entry<K, V>> locIter = locIter0;
+
+        final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null);
+
+        final CacheQueryFuture<Map.Entry<K, V>> fut = (CacheQueryFuture<Map.Entry<K, V>>)queryDistributed(bean, nodes);
+
+        return new GridCloseableIteratorAdapter<Map.Entry<K, V>>() {
+            /** */
+            private Map.Entry<K, V> cur;
+
+            @Override protected Map.Entry<K, V> onNext() throws IgniteCheckedException {
+                if (!onHasNext())
+                    throw new NoSuchElementException();
+
+                Map.Entry<K, V> e = cur;
+
+                cur = null;
+
+                return e;
+            }
+
+            @Override protected boolean onHasNext() throws IgniteCheckedException {
+                if (cur != null)
+                    return true;
+
+                if (locIter != null && locIter.hasNextX())
+                    cur = locIter.nextX();
+
+                return cur != null || (cur = fut.next()) != null;
+            }
+
+            @Override protected void onClose() throws IgniteCheckedException {
+                super.onClose();
+
+                if (locIter != null)
+                    locIter.close();
+
+                if (fut != null)
+                    fut.cancel();
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
     @Override public void loadPage(long id, GridCacheQueryAdapter<?> qry, Collection<ClusterNode> nodes, boolean all) {
         assert cctx.config().getCacheMode() != LOCAL;
         assert qry != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
index 4e72f97..183abde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java
@@ -19,16 +19,18 @@ package org.apache.ignite.internal.processors.cache.query;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 
 /**
- * Local query manager.
+ * Local query manager (for cache in LOCAL cache mode).
  */
 public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V> {
     /** {@inheritDoc} */
@@ -80,32 +82,20 @@ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public CacheQueryFuture<?> queryLocal(GridCacheQueryBean qry) {
+    @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
         assert cctx.config().getCacheMode() == LOCAL;
 
-        if (log.isDebugEnabled())
-            log.debug("Executing query on local node: " + qry);
-
-        GridCacheLocalQueryFuture<K, V, ?> fut = new GridCacheLocalQueryFuture<>(cctx, qry);
-
-        try {
-            qry.query().validate();
-
-            fut.execute();
-        }
-        catch (IgniteCheckedException e) {
-            fut.onDone(e);
-        }
-
-        return fut;
+        throw new IgniteException("Distributed queries are not available for local cache " +
+            "(use 'CacheQuery.execute(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']');
     }
 
     /** {@inheritDoc} */
-    @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
-        assert cctx.config().getCacheMode() == LOCAL;
+    @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry,
+        Collection<ClusterNode> nodes) throws IgniteCheckedException {
+        assert cctx.isLocal() : cctx.name();
 
-        throw new IgniteException("Distributed queries are not available for local cache " +
-            "(use 'CacheQuery.execute(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']');
+        throw new IgniteException("Distributed scan query are not available for local cache " +
+            "(use 'CacheQuery.executeScanQuery(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']');
     }
 
     /** {@inheritDoc} */
@@ -142,4 +132,4 @@ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V
         throw new IgniteException("Distributed queries are not available for local cache " +
             "(use 'CacheQuery.execute(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']');
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index b948dc5..90e14f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -22,6 +22,8 @@ import java.util.Collections;
 import java.util.Deque;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
@@ -40,9 +42,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -429,6 +434,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     @SuppressWarnings({"IfMayBeConditional", "unchecked"})
     private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer,
         @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
+        assert type != SCAN : this;
+
         Collection<ClusterNode> nodes;
 
         try {
@@ -440,7 +447,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
-        if (nodes.isEmpty() && (type != SCAN || part == null))
+        if (nodes.isEmpty())
             return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException());
 
         if (log.isDebugEnabled())
@@ -471,12 +478,44 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         if (type == SQL_FIELDS || type == SPI)
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
                 qryMgr.queryFieldsDistributed(bean, nodes));
-        else if (type == SCAN && part != null && !cctx.isLocal())
-            return new CacheQueryFallbackFuture<>(part, bean, qryMgr, cctx);
         else
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
     }
 
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional", "unchecked"})
+    @Override public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException {
+        assert type == SCAN: "Wrong processing of qyery: " + type;
+
+        Collection<ClusterNode> nodes = nodes();
+
+        cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+        if (nodes.isEmpty() && part == null)
+            return new GridEmptyCloseableIterator();
+
+        if (log.isDebugEnabled())
+            log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']');
+
+        if (cctx.deploymentEnabled())
+            cctx.deploy().registerClasses(filter);
+
+        if (subjId == null)
+            subjId = cctx.localNodeId();
+
+        taskHash = cctx.kernalContext().job().currentTaskNameHash();
+
+        final GridCacheQueryManager qryMgr = cctx.queries();
+
+        if (part != null && !cctx.isLocal())
+            return (GridCloseableIterator<R>)new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
+        else {
+            boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
+
+            return loc ? qryMgr.scanQueryLocal(this, true) : qryMgr.scanQueryDistributed(this, nodes);
+        }
+    }
+
     /**
      * @return Nodes to execute on.
      */
@@ -549,10 +588,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     /**
      * Wrapper for queries with fallback.
      */
-    private static class CacheQueryFallbackFuture<R> extends GridFutureAdapter<Collection<R>>
-        implements CacheQueryFuture<R> {
+    private static class ScanQueryFallbackClosableIterator extends GridCloseableIteratorAdapter<Map.Entry> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
         /** Query future. */
-        private volatile GridCacheQueryFutureAdapter<?, ?, R> fut;
+        private volatile T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> tuple;
 
         /** Backups. */
         private volatile Queue<ClusterNode> nodes;
@@ -564,7 +605,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         private volatile int unreservedNodesRetryCnt = 5;
 
         /** Bean. */
-        private final GridCacheQueryBean bean;
+        private final GridCacheQueryAdapter qry;
 
         /** Query manager. */
         private final GridCacheQueryManager qryMgr;
@@ -578,15 +619,18 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         /** Flag indicating that a first item has been returned to a user. */
         private boolean firstItemReturned;
 
+        /** */
+        private Map.Entry cur;
+
         /**
          * @param part Partition.
-         * @param bean Bean.
+         * @param qry Query.
          * @param qryMgr Query manager.
          * @param cctx Cache context.
          */
-        private CacheQueryFallbackFuture(int part, GridCacheQueryBean bean,
+        private ScanQueryFallbackClosableIterator(int part, GridCacheQueryAdapter qry,
             GridCacheQueryManager qryMgr, GridCacheContext cctx) {
-            this.bean = bean;
+            this.qry = qry;
             this.qryMgr = qryMgr;
             this.cctx = cctx;
             this.part = part;
@@ -628,46 +672,82 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         private void init() {
             final ClusterNode node = nodes.poll();
 
-            fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ?
-                qryMgr.queryLocal(bean) :
-                qryMgr.queryDistributed(bean, Collections.singleton(node)));
-        }
+            if (node.isLocal()) {
+                try {
+                    GridCloseableIterator it = qryMgr.scanQueryLocal(qry, true);
 
-        /** {@inheritDoc} */
-        @Override public int available() {
-            return fut.available();
-        }
+                    tuple= new T2(it, null);
+                }
+                catch (IgniteClientDisconnectedCheckedException e) {
+                    throw CU.convertToCacheException(e);
+                }
+                catch (IgniteCheckedException e) {
+                    retryIfPossible(e);
+                }
+            }
+            else {
+                final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null);
 
-        /** {@inheritDoc} */
-        @Override public boolean cancel() throws IgniteCheckedException {
-            return fut.cancel();
+                GridCacheQueryFutureAdapter fut =
+                    (GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean, Collections.singleton(node));
+
+                tuple = new T2(null, fut);
+            }
         }
 
         /** {@inheritDoc} */
-        @Override public Collection<R> get() throws IgniteCheckedException {
-            assert false;
+        @Override protected Map.Entry onNext() throws IgniteCheckedException {
+            if (!onHasNext())
+                throw new NoSuchElementException();
 
-            return super.get();
+            assert cur != null;
+
+            Map.Entry e = cur;
+
+            cur = null;
+
+            return e;
         }
 
         /** {@inheritDoc} */
-        @Override public R next() {
-            if (firstItemReturned)
-                return fut.next();
-
+        @Override protected boolean onHasNext() throws IgniteCheckedException {
             while (true) {
-                try {
-                    fut.awaitFirstPage();
+                if (cur != null)
+                    return true;
 
-                    firstItemReturned = true;
+                T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> t = tuple;
 
-                    return fut.next();
-                }
-                catch (IgniteClientDisconnectedCheckedException e) {
-                    throw CU.convertToCacheException(e);
+                GridCloseableIterator<Map.Entry> iter = t.get1();
+
+                if (iter != null) {
+                    boolean hasNext = iter.hasNext();
+
+                    if (hasNext)
+                        cur = iter.next();
+
+                    return hasNext;
                 }
-                catch (IgniteCheckedException e) {
-                    retryIfPossible(e);
+                else {
+                    GridCacheQueryFutureAdapter fut = t.get2();
+
+                    assert fut != null;
+
+                    if (firstItemReturned)
+                        return (cur = (Map.Entry)fut.next()) != null;
+
+                    try {
+                        fut.awaitFirstPage();
+
+                        firstItemReturned = true;
+
+                        return (cur = (Map.Entry)fut.next()) != null;
+                    }
+                    catch (IgniteClientDisconnectedCheckedException e) {
+                        throw CU.convertToCacheException(e);
+                    }
+                    catch (IgniteCheckedException e) {
+                        retryIfPossible(e);
+                    }
                 }
             }
         }
@@ -679,8 +759,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
             try {
                 IgniteInternalFuture<?> retryFut;
 
-                if (e.hasCause(GridDhtUnreservedPartitionException.class)) {
-                    AffinityTopologyVersion waitVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion();
+                GridDhtUnreservedPartitionException partErr = X.cause(e, GridDhtUnreservedPartitionException.class);
+
+                if (partErr != null) {
+                    AffinityTopologyVersion waitVer = partErr.topologyVersion();
 
                     assert waitVer != null;
 
@@ -720,5 +802,18 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                 throw CU.convertToCacheException(ex);
             }
         }
+
+        /** {@inheritDoc} */
+        @Override protected void onClose() throws IgniteCheckedException {
+            super.onClose();
+
+            T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> t = tuple;
+
+            if (t != null && t.get1() != null)
+                t.get1().close();
+
+            if (t != null && t.get2() != null)
+                t.get2().cancel();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
index fd8c4d8..ac14ae6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java
@@ -36,14 +36,14 @@ public class GridCacheQueryErrorFuture<T> extends GridFinishedFuture<Collection<
     }
 
     /** {@inheritDoc} */
-    @Override public int available() throws IgniteCheckedException {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
     @Nullable @Override public T next() throws IgniteCheckedException {
         get();
 
         return null;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        cancel();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index e3e5d98..db519f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -161,11 +161,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     }
 
     /** {@inheritDoc} */
-    @Override public int available() {
-        return cnt.get();
-    }
-
-    /** {@inheritDoc} */
     @Override public R next() {
         try {
             R next = unmaskNull(internalIterator().next());
@@ -571,6 +566,11 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
     }
 
     /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        cancel();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheQueryFutureAdapter.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index df95e2e..3b3c5f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -38,6 +38,7 @@ import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
@@ -465,7 +466,27 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param qry Query.
      * @return Query future.
      */
-    public abstract CacheQueryFuture<?> queryLocal(GridCacheQueryBean qry);
+    @SuppressWarnings("unchecked")
+    public CacheQueryFuture<?> queryLocal(GridCacheQueryBean qry) {
+        assert qry.query().type() != GridCacheQueryType.SCAN : qry;
+
+        if (log.isDebugEnabled())
+            log.debug("Executing query on local node: " + qry);
+
+        GridCacheLocalQueryFuture fut = new GridCacheLocalQueryFuture<>(cctx, qry);
+
+        try {
+            qry.query().validate();
+
+            fut.execute();
+        }
+        catch (IgniteCheckedException e) {
+            if (fut != null)
+                fut.onDone(e);
+        }
+
+        return fut;
+    }
 
     /**
      * Executes distributed query.
@@ -477,6 +498,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     public abstract CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes);
 
     /**
+     * Executes distributed SCAN query.
+     *
+     * @param qry Query.
+     * @param nodes Nodes.
+     * @return Iterator.
+     * @throws IgniteCheckedException If failed.
+     */
+    public abstract GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry,
+        Collection<ClusterNode> nodes) throws IgniteCheckedException;
+
+    /**
      * Loads page.
      *
      * @param id Query ID.
@@ -590,7 +622,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                             taskName));
                     }
 
-                    iter = scanIterator(qry);
+                    iter = scanIterator(qry, false);
 
                     break;
 
@@ -799,18 +831,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
     /**
      * @param qry Query.
+     * @param locNode Local node.
      * @return Full-scan row iterator.
      * @throws IgniteCheckedException If failed to get iterator.
      */
     @SuppressWarnings({"unchecked"})
-    private GridCloseableIterator<IgniteBiTuple<K, V>> scanIterator(final GridCacheQueryAdapter<?> qry)
+    private GridCloseableIterator<IgniteBiTuple<K, V>> scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode)
         throws IgniteCheckedException {
-        IgniteInternalCache<K, V> prj0 = cctx.cache();
-
-        prj0 = prj0.keepBinary();
-
-        final IgniteInternalCache prj = prj0;
-
         final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
 
         try {
@@ -822,56 +849,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             final boolean backups = qry.includeBackups() || cctx.isReplicated();
 
-            Iterator<K> keyIter;
-
-            GridDhtLocalPartition locPart = null;
-
-            Integer part = qry.partition();
-
-            if (part == null || cctx.isLocal())
-                keyIter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator();
-            else if (part < 0 || part >= cctx.affinity().partitions())
-                keyIter = F.emptyIterator();
-            else {
-                final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
-
-                locPart = dht.topology().localPartition(part, topVer, false);
-
-                // double check for owning state
-                if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING)
-                    throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
-                        "Partition can not be reserved");
-
-                final GridDhtLocalPartition locPart0 = locPart;
-
-                keyIter = new Iterator<K>() {
-                    private Iterator<KeyCacheObject> iter0 = locPart0.keySet().iterator();
-
-                    @Override public boolean hasNext() {
-                        return iter0.hasNext();
-                    }
-
-                    @Override public K next() {
-                        return (K)iter0.next();
-                    }
-
-                    @Override public void remove() {
-                        iter0.remove();
-                    }
-                };
-            }
-
-            final GridDhtLocalPartition locPart0 = locPart;
-
-            final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
-                new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), true) {
-                    @Override protected void onClose() {
-                        super.onClose();
-
-                        if (locPart0 != null)
-                            locPart0.release();
-                    }
-                };
+            final GridIterator<IgniteBiTuple<K, V>> heapIt = onheapIterator(qry,
+                topVer,
+                keyValFilter,
+                backups,
+                plc,
+                locNode);
 
             final GridIterator<IgniteBiTuple<K, V>> it;
 
@@ -881,10 +864,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 iters.add(heapIt);
 
                 if (cctx.isOffHeapEnabled())
-                    iters.add(offheapIterator(qry, topVer, backups, plc));
+                    iters.add(offheapIterator(qry, topVer, backups, plc, locNode));
 
                 if (cctx.swap().swapEnabled())
-                    iters.add(swapIterator(qry, topVer, backups, plc));
+                    iters.add(swapIterator(qry, topVer, backups, plc, locNode));
 
                 it = new CompoundIterator<>(iters);
             }
@@ -906,7 +889,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 @Override protected void onClose() throws IgniteCheckedException {
                     try {
-                        heapIt.close();
+                        if (heapIt instanceof IgniteSpiCloseableIterator)
+                            ((IgniteSpiCloseableIterator)heapIt).close();
                     }
                     finally {
                         closeScanFilter(keyValFilter);
@@ -914,8 +898,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 }
             };
         }
-        catch (IgniteCheckedException | RuntimeException e)
-        {
+        catch (IgniteCheckedException | RuntimeException e) {
             closeScanFilter(keyValFilter);
 
             throw e;
@@ -934,7 +917,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
     /**
      * @param qry Query.
+     * @param topVer Topology version.
      * @param backups Include backups.
+     * @param expPlc Expiry policy.
+     * @param locNode Local node.
      * @return Swap iterator.
      * @throws IgniteCheckedException If failed.
      */
@@ -942,8 +928,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         GridCacheQueryAdapter<?> qry,
         AffinityTopologyVersion topVer,
         boolean backups,
-        ExpiryPolicy expPlc
-    ) throws IgniteCheckedException {
+        ExpiryPolicy expPlc,
+        boolean locNode) throws IgniteCheckedException {
         IgniteBiPredicate<K, V> filter = qry.scanFilter();
 
         Integer part = qry.partition();
@@ -957,22 +943,146 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 topVer,
                 filter,
                 expPlc,
-                qry.keepBinary());
+                qry.keepBinary(), locNode);
 
-        return scanIterator(it, filter, qry.keepBinary());
+        return scanIterator(it, filter, qry.keepBinary(), locNode);
     }
 
     /**
      * @param qry Query.
+     * @param topVer Topology version.
+     * @param keyValFilter Filter.
      * @param backups Include backups.
+     * @param plc Expiry policy.
+     * @param locNode Local node.
+     * @return Offheap iterator.
+     * @throws GridDhtUnreservedPartitionException If failed to reserve partition.
+     */
+    private GridIterator<IgniteBiTuple<K, V>> onheapIterator(
+        GridCacheQueryAdapter<?> qry,
+        AffinityTopologyVersion topVer,
+        final IgniteBiPredicate<K, V> keyValFilter,
+        boolean backups,
+        final ExpiryPolicy plc,
+        final boolean locNode) throws GridDhtUnreservedPartitionException {
+        Iterator<K> keyIter;
+
+        GridDhtLocalPartition locPart = null;
+
+        Integer part = qry.partition();
+
+        if (part == null || cctx.isLocal()) {
+            // Performance optimization.
+            if (locNode && plc == null && !cctx.isLocal()) {
+                GridDhtCacheAdapter<K, V> cache = cctx.isNear() ? cctx.near().dht() : cctx.dht();
+
+                final Iterator<Cache.Entry<K, V>> iter = cache.localEntriesIterator(true, backups);
+
+                return new GridIteratorAdapter<IgniteBiTuple<K, V>>() {
+                    /** */
+                    private IgniteBiTuple<K, V> next;
+
+                    {
+                        advance();
+                    }
+
+                    @Override public boolean hasNextX() throws IgniteCheckedException {
+                        return next != null;
+                    }
+
+                    @Override public IgniteBiTuple<K, V> nextX() throws IgniteCheckedException {
+                        if (next == null)
+                            throw new NoSuchElementException();
+
+                        IgniteBiTuple<K, V> next0 = next;
+
+                        advance();
+
+                        return next0;
+                    }
+
+                    @Override public void removeX() throws IgniteCheckedException {
+                        // No-op.
+                    }
+
+                    private void advance() {
+                        IgniteBiTuple<K, V> next0 = null;
+
+                        while (iter.hasNext()) {
+                            Cache.Entry<K, V> cacheEntry = iter.next();
+
+                            if (keyValFilter != null && !keyValFilter.apply(cacheEntry.getKey(), cacheEntry.getValue()))
+                                continue;
+
+                            next0 = new IgniteBiTuple<>(cacheEntry.getKey(), cacheEntry.getValue());
+
+                            break;
+                        }
+
+                        next = next0;
+                    }
+                };
+            }
+
+            IgniteInternalCache<K, V> keepBinaryCache = cctx.cache().keepBinary();
+
+            keyIter = backups ? keepBinaryCache.keySetx().iterator() : keepBinaryCache.primaryKeySet().iterator();
+        }
+        else if (part < 0 || part >= cctx.affinity().partitions())
+            keyIter = F.emptyIterator();
+        else {
+            final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
+
+            locPart = dht.topology().localPartition(part, topVer, false);
+
+            // Double check for owning state.
+            if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING)
+                throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
+                    "Partition can not be reserved.");
+
+            final GridDhtLocalPartition locPart0 = locPart;
+
+            keyIter = new Iterator<K>() {
+                private Iterator<KeyCacheObject> iter0 = locPart0.keySet().iterator();
+
+                @Override public boolean hasNext() {
+                    return iter0.hasNext();
+                }
+
+                @Override public K next() {
+                    return (K)iter0.next();
+                }
+
+                @Override public void remove() {
+                    iter0.remove();
+                }
+            };
+        }
+
+        final GridDhtLocalPartition locPart0 = locPart;
+
+        return new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) {
+                @Override protected void onClose() {
+                    super.onClose();
+
+                    if (locPart0 != null)
+                        locPart0.release();
+                }
+            };
+    }
+
+    /**
+     * @param qry Query.
+     * @param backups Include backups.
+     * @param locNode Local node.
      * @return Offheap iterator.
      */
     private GridIterator<IgniteBiTuple<K, V>> offheapIterator(
         GridCacheQueryAdapter<?> qry,
         AffinityTopologyVersion topVer,
         boolean backups,
-        ExpiryPolicy expPlc
-    ) {
+        ExpiryPolicy expPlc,
+        boolean locNode) {
         IgniteBiPredicate<K, V> filter = qry.scanFilter();
 
         if (expPlc != null) {
@@ -981,18 +1091,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 topVer,
                 filter,
                 expPlc,
-                qry.keepBinary());
+                qry.keepBinary(), locNode);
         }
 
         if (cctx.offheapTiered() && filter != null) {
-            OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepBinary());
+            OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepBinary(), locNode);
 
             return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups);
         }
         else {
             Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups);
 
-            return scanIterator(it, filter, qry.keepBinary());
+            return scanIterator(it, filter, qry.keepBinary(), locNode);
         }
     }
 
@@ -1000,12 +1110,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param it Lazy swap or offheap iterator.
      * @param filter Scan filter.
      * @param keepBinary Keep binary flag.
+     * @param locNode Local node.
      * @return Iterator.
      */
     private GridIteratorAdapter<IgniteBiTuple<K, V>> scanIterator(
         @Nullable final Iterator<Map.Entry<byte[], byte[]>> it,
         @Nullable final IgniteBiPredicate<K, V> filter,
-        final boolean keepBinary) {
+        final boolean keepBinary,
+        final boolean locNode) {
         if (it == null)
             return new GridEmptyCloseableIterator<>();
 
@@ -1041,15 +1153,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 while (it.hasNext()) {
                     final LazySwapEntry e = new LazySwapEntry(it.next());
 
-                    if (filter != null) {
-                        K key = (K)cctx.unwrapBinaryIfNeeded(e.key(), keepBinary);
-                        V val = (V)cctx.unwrapBinaryIfNeeded(e.value(), keepBinary);
+                    K key = e.key();
+                    V val = e.value();
 
-                        if (!filter.apply(key, val))
-                            continue;
+                    if (filter != null || locNode) {
+                        key = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary);
+                        val = (V)cctx.unwrapBinaryIfNeeded(val, keepBinary);
                     }
 
-                    next = new IgniteBiTuple<>(e.key(), e.value());
+                    if (filter != null && !filter.apply(key, val))
+                        continue;
+
+                    next = new IgniteBiTuple<>(key, val);
 
                     break;
                 }
@@ -1063,6 +1178,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @param filter Filter.
      * @param expPlc Expiry policy.
      * @param keepBinary Keep binary flag.
+     * @param locNode Local node.
      * @return Final key-value iterator.
      */
     private GridIterator<IgniteBiTuple<K,V>> scanExpiryIterator(
@@ -1070,8 +1186,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         AffinityTopologyVersion topVer,
         @Nullable final IgniteBiPredicate<K, V> filter,
         ExpiryPolicy expPlc,
-        final boolean keepBinary
-    ) {
+        final boolean keepBinary,
+        boolean locNode) {
         Iterator <K> keyIter = new Iterator<K>() {
             /** {@inheritDoc} */
             @Override public boolean hasNext() {
@@ -1096,7 +1212,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             }
         };
 
-        return new PeekValueExpiryAwareIterator(keyIter, expPlc, topVer, filter, keepBinary, false);
+        return new PeekValueExpiryAwareIterator(keyIter, expPlc, topVer, filter, keepBinary, locNode, false);
     }
 
     /**
@@ -1317,6 +1433,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     @SuppressWarnings("unchecked")
     protected void runQuery(GridCacheQueryInfo qryInfo) {
         assert qryInfo != null;
+        assert qryInfo.query().type() != SCAN || !qryInfo.local() : qryInfo;
 
         if (!enterBusy()) {
             if (cctx.localNodeId().equals(qryInfo.senderId()))
@@ -1438,6 +1555,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     }
 
                     if (readEvt) {
+                        K key0  = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
+                        V val0  = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
+
                         switch (type) {
                             case SQL:
                                 cctx.gridEvents().record(new CacheQueryReadEvent<>(
@@ -1453,8 +1573,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                                     qryInfo.arguments(),
                                     qry.subjectId(),
                                     taskName,
-                                    key,
-                                    val,
+                                    key0,
+                                    val0,
                                     null,
                                     null));
 
@@ -1474,8 +1594,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                                     null,
                                     qry.subjectId(),
                                     taskName,
-                                    key,
-                                    val,
+                                    key0,
+                                    val0,
                                     null,
                                     null));
 
@@ -1495,8 +1615,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                                     null,
                                     qry.subjectId(),
                                     taskName,
-                                    key,
-                                    val,
+                                    key0,
+                                    val0,
                                     null,
                                     null));
 
@@ -1588,12 +1708,127 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
+     * Process local scan query.
+     *
+     * @param qry Query.
+     * @param updStatisticsIfNeeded Update statistics flag.
+     */
+    @SuppressWarnings({"unchecked", "serial"})
+    protected GridCloseableIterator<IgniteBiTuple<K, V>> scanQueryLocal(final GridCacheQueryAdapter qry,
+        final boolean updStatisticsIfNeeded) throws IgniteCheckedException {
+        if (!enterBusy())
+            throw new IllegalStateException("Failed to process query request (grid is stopping).");
+
+        final boolean statsEnabled = cctx.config().isStatisticsEnabled();
+
+        boolean needUpdStatistics = updStatisticsIfNeeded && statsEnabled;
+
+        long startTime = U.currentTimeMillis();
+
+        try {
+            assert qry.type() == SCAN;
+
+            if (log.isDebugEnabled())
+                log.debug("Running local SCAN query: " + qry);
+
+            final String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
+            final IgniteBiPredicate filter = qry.scanFilter();
+            final String namex = cctx.namex();
+            final ClusterNode locNode = cctx.localNode();
+            final UUID subjId = qry.subjectId();
+
+            if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+                cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
+                    locNode,
+                    "Scan query executed.",
+                    EVT_CACHE_QUERY_EXECUTED,
+                    CacheQueryType.SCAN.name(),
+                    namex,
+                    null,
+                    null,
+                    filter,
+                    null,
+                    null,
+                    subjId,
+                    taskName));
+            }
+
+            final GridCloseableIterator<IgniteBiTuple<K, V>> iter = scanIterator(qry, true);
+
+            if (updStatisticsIfNeeded) {
+                needUpdStatistics = false;
+
+                cctx.queries().onCompleted(U.currentTimeMillis() - startTime, false);
+            }
+
+            final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
+            return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
+                @Override protected IgniteBiTuple<K, V> onNext() throws IgniteCheckedException {
+                    long start = statsEnabled ? System.nanoTime() : 0L;
+
+                    IgniteBiTuple<K, V> next = iter.nextX();
+
+                    if (statsEnabled) {
+                        CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+                        metrics.onRead(true);
+
+                        metrics.addGetTimeNanos(System.nanoTime() - start);
+                    }
+
+                    if (readEvt) {
+                        cctx.gridEvents().record(new CacheQueryReadEvent<>(
+                            cctx.localNode(),
+                            "Scan query entry read.",
+                            EVT_CACHE_QUERY_OBJECT_READ,
+                            CacheQueryType.SCAN.name(),
+                            namex,
+                            null,
+                            null,
+                            filter,
+                            null,
+                            null,
+                            subjId,
+                            taskName,
+                            next.getKey(),
+                            next.getValue(),
+                            null,
+                            null));
+                    }
+
+                    return next;
+                }
+
+                @Override protected boolean onHasNext() throws IgniteCheckedException {
+                    return iter.hasNextX();
+                }
+
+                @Override protected void onClose() throws IgniteCheckedException {
+                    iter.close();
+                }
+            };
+        }
+        catch (Exception e) {
+            if (needUpdStatistics)
+                cctx.queries().onCompleted(U.currentTimeMillis() - startTime, true);
+
+            throw e;
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
      * @param qryInfo Info.
      * @param taskName Task name.
      * @return Iterator.
      * @throws IgniteCheckedException In case of error.
      */
-    private QueryResult<K, V> queryResult(GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
+    private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
+        assert qryInfo != null;
+
         final UUID sndId = qryInfo.senderId();
 
         assert sndId != null;
@@ -1601,8 +1836,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs = qryIters.get(sndId);
 
         if (futs == null) {
-            futs = new LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, V>>>(
-                16, 0.75f, true) {
+            futs = new LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, V>>>(16, 0.75f, true) {
                 @Override protected boolean removeEldestEntry(Map.Entry<Long, GridFutureAdapter<QueryResult<K, V>>> e) {
                     boolean rmv = size() > maxIterCnt;
 
@@ -1625,22 +1859,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 futs = old;
         }
 
-        return queryResult(futs, qryInfo, taskName);
-    }
-
-    /**
-     * @param futs Futures map.
-     * @param qryInfo Info.
-     * @return Iterator.
-     * @throws IgniteCheckedException In case of error.
-     */
-    @SuppressWarnings({
-        "SynchronizationOnLocalVariableOrMethodParameter",
-        "NonPrivateFieldAccessedInSynchronizedContext"})
-    private QueryResult<K, V> queryResult(Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs,
-        GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException {
         assert futs != null;
-        assert qryInfo != null;
 
         GridFutureAdapter<QueryResult<K, V>> fut;
 
@@ -2598,17 +2817,23 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         /** */
         private boolean keepBinary;
 
+        /** */
+        private boolean locNode;
+
         /**
          * @param filter Filter.
          * @param keepBinary Keep binary flag.
+         * @param locNode Local node.
          */
         private OffheapIteratorClosure(
             @Nullable IgniteBiPredicate<K, V> filter,
-            boolean keepBinary) {
+            boolean keepBinary,
+            boolean locNode) {
             assert filter != null;
 
             this.filter = filter;
             this.keepBinary = keepBinary;
+            this.locNode = locNode;
         }
 
         /** {@inheritDoc} */
@@ -2623,15 +2848,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             if (!filter.apply(key, val))
                 return null;
 
-            if (key instanceof CacheObject)
-                ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext());
+            if (locNode)
+                return new IgniteBiTuple<>(key, val);
+            else{
+                if (key instanceof CacheObject)
+                    ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext());
 
-            val = (V)cctx.unwrapTemporary(e.value());
+                val = (V)cctx.unwrapTemporary(e.value());
 
-            if (val instanceof CacheObject)
-                ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext());
+                if (val instanceof CacheObject)
+                    ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext());
 
-            return new IgniteBiTuple<>(e.key(), val);
+                return new IgniteBiTuple<>(key, val);
+            }
         }
     }
 
@@ -3112,6 +3341,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         /** */
         private final IgniteBiPredicate<K, V> keyValFilter;
 
+        /** */
+        private boolean locNode;
+
         /** Heap only flag. */
         private boolean heapOnly;
 
@@ -3133,6 +3365,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
          * @param topVer Topology version.
          * @param keyValFilter Key-value filter.
          * @param keepBinary Keep binary flag from the query.
+         * @param locNode Local node.
+         * @param heapOnly Heap only.
          */
         private PeekValueExpiryAwareIterator(
             Iterator<K> keyIt,
@@ -3140,12 +3374,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             AffinityTopologyVersion topVer,
             IgniteBiPredicate<K, V> keyValFilter,
             boolean keepBinary,
+            boolean locNode,
             boolean heapOnly
         ) {
             this.keyIt = keyIt;
             this.plc = plc;
             this.topVer = topVer;
             this.keyValFilter = keyValFilter;
+            this.locNode = locNode;
             this.heapOnly = heapOnly;
 
             dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
@@ -3209,11 +3445,27 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 }
 
                 if (val != null) {
+                    boolean keepBinary0 = !locNode || keepBinary;
+
                     next0 = F.t(
-                        (K)cctx.unwrapBinaryIfNeeded(key, true),
-                        (V)cctx.unwrapBinaryIfNeeded(val, true));
+                        (K)cctx.unwrapBinaryIfNeeded(key, keepBinary0),
+                        (V)cctx.unwrapBinaryIfNeeded(val, keepBinary0));
+
+                    boolean passPred = true;
 
-                    if (checkPredicate(next0))
+                    if (keyValFilter != null) {
+                        Object key0 = next0.getKey();
+                        Object val0 = next0.getValue();
+
+                        if (keepBinary0 && !keepBinary) {
+                            key0 = (K)cctx.unwrapBinaryIfNeeded(key0, keepBinary);
+                            val0 = (V)cctx.unwrapBinaryIfNeeded(val0, keepBinary);
+                        }
+
+                        passPred = keyValFilter.apply((K)key0, (V)val0);
+                    }
+
+                    if (passPred)
                         break;
                     else
                         next0 = null;
@@ -3260,21 +3512,5 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 }
             }
         }
-
-        /**
-         * Check key-value predicate.
-         *
-         * @param e Entry to check.
-         * @return Filter evaluation result.
-         */
-        private boolean checkPredicate(Map.Entry<K, V> e) {
-            if (keyValFilter != null) {
-                Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(e, keepBinary);
-
-                return keyValFilter.apply(e0.getKey(), e0.getValue());
-            }
-
-            return true;
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f25e361..2f9f9d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -389,7 +389,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
 
             CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute();
 
-            CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it =
+            CacheWeakQueryIteratorsHolder.WeakReferenceCloseableIterator it =
                 ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, Map.Entry<T, ?>>() {
                     @Override protected T convert(Map.Entry<T, ?> e) {
                         return e.getKey();
@@ -627,4 +627,4 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
             setName = U.readString(in);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/bcb3e104/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 1e1182f..b5634e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -1098,35 +1099,41 @@ public class GridServiceProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings("unchecked")
     private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object, Object> p) {
-        if (!cache.context().affinityNode()) {
-            ClusterNode oldestSrvNode =
-                CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
+        try {
+            if (!cache.context().affinityNode()) {
+                ClusterNode oldestSrvNode =
+                    CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
 
             if (oldestSrvNode == null)
                 return F.emptyIterator();
 
-            GridCacheQueryManager qryMgr = cache.context().queries();
+                GridCacheQueryManager qryMgr = cache.context().queries();
 
-            CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false);
+                CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false);
 
-            qry.keepAll(false);
+                qry.keepAll(false);
 
-            qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
+                qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
 
-            return cache.context().itHolder().iterator(qry.execute(),
-                new CacheIteratorConverter<Object, Map.Entry<Object,Object>>() {
-                    @Override protected Object convert(Map.Entry<Object, Object> e) {
-                        return new CacheEntryImpl<>(e.getKey(), e.getValue());
-                    }
+                GridCloseableIterator<Map.Entry<Object, Object>> iter = qry.executeScanQuery();
 
-                    @Override protected void remove(Object item) {
-                        throw new UnsupportedOperationException();
-                    }
-                }
-            );
+                return cache.context().itHolder().iterator(iter,
+                    new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object,Object>>() {
+                        @Override protected Cache.Entry<Object, Object> convert(Map.Entry<Object, Object> e) {
+                            return new CacheEntryImpl<>(e.getKey(), e.getValue());
+                        }
+
+                        @Override protected void remove(Cache.Entry<Object, Object> item) {
+                            throw new UnsupportedOperationException();
+                        }
+                    });
+            }
+            else
+                return cache.entrySetx().iterator();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
         }
-        else
-            return cache.entrySetx().iterator();
     }
 
     /**


Mime
View raw message