ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [2/2] ignite git commit: IGNITE-6101 Try to improve local scans performance
Date Fri, 25 Aug 2017 09:22:59 GMT
IGNITE-6101 Try to improve local scans performance


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1c9d80a5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1c9d80a5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1c9d80a5

Branch: refs/heads/master
Commit: 1c9d80a540cbce0a9d9a65e3fac2e06f53b73f43
Parents: 79d47f8
Author: Igor Seliverstov <gvvinblade@gmail.com>
Authored: Fri Aug 25 12:22:44 2017 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Fri Aug 25 12:22:44 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/binary/BinaryUtils.java     |  26 +
 .../processors/cache/CacheObjectUtils.java      |  65 ++-
 .../processors/cache/GridCacheAdapter.java      |   6 +-
 .../processors/cache/GridCacheEntryEx.java      |  10 +
 .../processors/cache/GridCacheMapEntry.java     |  27 +-
 .../processors/cache/IgniteCacheProxyImpl.java  |  26 +-
 .../colocated/GridDhtDetachedCacheEntry.java    |   4 +-
 .../distributed/near/GridNearCacheEntry.java    |   4 +-
 .../processors/cache/query/CacheQueryEntry.java |  58 +++
 .../query/GridCacheDistributedQueryManager.java |  16 +-
 .../cache/query/GridCacheQueryAdapter.java      |  53 ++-
 .../cache/query/GridCacheQueryManager.java      | 470 +++++++++----------
 .../IgniteCacheObjectProcessorImpl.java         | 164 -------
 .../UserCacheObjectByteArrayImpl.java           |  59 +++
 .../cacheobject/UserCacheObjectImpl.java        |  82 ++++
 .../cacheobject/UserKeyCacheObjectImpl.java     | 101 ++++
 .../service/GridServiceProcessor.java           |   6 +-
 .../resources/META-INF/classnames.properties    |  88 ++--
 .../processors/cache/GridCacheTestEntryEx.java  |   6 +
 .../GridCacheQueryTransformerSelfTest.java      |  41 ++
 20 files changed, 788 insertions(+), 524 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index 74d1730..8970a4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -60,6 +60,12 @@ import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.binary.Binarylizable;
 import org.apache.ignite.internal.binary.builder.BinaryLazyValue;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectByteArrayImpl;
+import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl;
+import org.apache.ignite.internal.processors.cacheobject.UserKeyCacheObjectImpl;
 import org.apache.ignite.internal.util.MutableSingletonList;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -707,6 +713,26 @@ public class BinaryUtils {
     }
 
     /**
+     * @param obj Object to check.
+     * @return True if this is an object of a known type.
+     */
+    public static boolean knownCacheObject(Object obj) {
+        if (obj == null)
+            return false;
+
+        Class<?> cls= obj.getClass();
+
+        return cls == KeyCacheObjectImpl.class ||
+            cls == BinaryObjectImpl.class ||
+            cls == CacheObjectImpl.class ||
+            cls == CacheObjectByteArrayImpl.class ||
+            cls == BinaryEnumObjectImpl.class ||
+            cls == UserKeyCacheObjectImpl.class ||
+            cls == UserCacheObjectImpl.class ||
+            cls == UserCacheObjectByteArrayImpl.class;
+    }
+
+    /**
      * @param arr Array to check.
      * @return {@code true} if this array is of a known type.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
index 5afa751..aeca79e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
@@ -17,15 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
 import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.util.MutableSingletonList;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
 /**
  * Cache object utility methods.
  */
@@ -36,10 +35,35 @@ public class CacheObjectUtils {
      * @param cpy Copy value flag.
      * @return Unwrapped object.
      */
+    public static Object unwrapBinaryIfNeeded(CacheObjectValueContext ctx, CacheObject o, boolean keepBinary, boolean cpy) {
+        return unwrapBinary(ctx, o, keepBinary, cpy);
+    }
+
+    /**
+     * @param o Object to unwrap.
+     * @param keepBinary Keep binary flag.
+     * @param cpy Copy value flag.
+     * @return Unwrapped object.
+     */
     public static Object unwrapBinaryIfNeeded(CacheObjectValueContext ctx, Object o, boolean keepBinary, boolean cpy) {
         if (o == null)
             return null;
 
+        // TODO has to be overloaded
+        if (o instanceof Map.Entry) {
+            Map.Entry entry = (Map.Entry)o;
+
+            Object key = entry.getKey();
+
+            Object uKey = unwrapBinary(ctx, key, keepBinary, cpy);
+
+            Object val = entry.getValue();
+
+            Object uVal = unwrapBinary(ctx, val, keepBinary, cpy);
+
+            return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o;
+        }
+
         return unwrapBinary(ctx, o, keepBinary, cpy);
     }
 
@@ -86,7 +110,10 @@ public class CacheObjectUtils {
         Map<Object, Object> map0 = BinaryUtils.newMap(map);
 
         for (Map.Entry<Object, Object> e : map.entrySet())
-            map0.put(unwrapBinary(ctx, e.getKey(), false, cpy), unwrapBinary(ctx, e.getValue(), false, cpy));
+            // TODO why don't we use keepBinary parameter here?
+            map0.put(
+                unwrapBinary(ctx, e.getKey(), false, cpy),
+                unwrapBinary(ctx, e.getValue(), false, cpy));
 
         return map0;
     }
@@ -105,7 +132,7 @@ public class CacheObjectUtils {
             col0 = new ArrayList<>(col.size());
 
         for (Object obj : col)
-            col0.add(unwrapBinary(ctx, obj, keepBinary, cpy));
+            col0.add(unwrapBinaryIfNeeded(ctx, obj, keepBinary, cpy));
 
         return col0;
     }
@@ -137,31 +164,25 @@ public class CacheObjectUtils {
      */
     @SuppressWarnings("unchecked")
     private static Object unwrapBinary(CacheObjectValueContext ctx, Object o, boolean keepBinary, boolean cpy) {
-        if (o instanceof Map.Entry) {
-            Map.Entry entry = (Map.Entry)o;
-
-            Object key = entry.getKey();
-
-            Object uKey = unwrapBinary(ctx, key, keepBinary, cpy);
+        if (o == null)
+            return o;
 
-            Object val = entry.getValue();
+        while (BinaryUtils.knownCacheObject(o)) {
+            CacheObject co = (CacheObject)o;
 
-            Object uVal = unwrapBinary(ctx, val, keepBinary, cpy);
+            if (!co.isPlatformType() && keepBinary)
+                return o;
 
-            return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o;
+            // It may be a collection of binaries
+            o = co.value(ctx, cpy);
         }
-        else if (BinaryUtils.knownCollection(o))
+
+        if (BinaryUtils.knownCollection(o))
             return unwrapKnownCollection(ctx, (Collection<Object>)o, keepBinary, cpy);
         else if (BinaryUtils.knownMap(o))
             return unwrapBinariesIfNeeded(ctx, (Map<Object, Object>)o, keepBinary, cpy);
         else if (o instanceof Object[])
             return unwrapBinariesInArrayIfNeeded(ctx, (Object[])o, keepBinary, cpy);
-        else if (o instanceof CacheObject) {
-            CacheObject co = (CacheObject)o;
-
-            if (!keepBinary || co.isPlatformType())
-                return unwrapBinary(ctx, co.value(ctx, cpy), keepBinary, cpy);
-        }
 
         return o;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index fed716c..8e346ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -83,13 +83,13 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
 import org.apache.ignite.internal.cluster.IgniteClusterEx;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -3916,7 +3916,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         return ctx.itHolder().iterator(iter, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
             @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
-                return new CacheEntryImpl<>(e.getKey(), e.getValue());
+                // Actually Scan Query returns Iterator<CacheQueryEntry> by default,
+                // CacheQueryEntry implements both Map.Entry and Cache.Entry interfaces.
+                return (Cache.Entry<K, V>) e;
             }
 
             @Override protected void remove(Cache.Entry<K, V> item) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index e2bc7ff..b2cabac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -934,6 +935,15 @@ public interface GridCacheEntryEx {
         throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
+     * @param row Already extracted value.
+     * @return Value.
+     * @throws IgniteCheckedException If failed to read from swap storage.
+     * @throws GridCacheEntryRemovedException If entry was removed.
+     */
+    @Nullable public CacheObject unswap(CacheDataRow row)
+        throws IgniteCheckedException, GridCacheEntryRemovedException;
+
+    /**
      * Unswap ignoring flags.
      *
      * @param needVal If {@code false} then do not need to deserialize value during unswap.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index d991c86..61f6fb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -342,9 +342,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /** {@inheritDoc} */
+    @Override public final CacheObject unswap(CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException {
+        row = unswap(row, true);
+
+        return row != null ? row.value() : null;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public final CacheObject unswap(boolean needVal)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
-        CacheDataRow row = unswap(needVal, true);
+        CacheDataRow row = unswap(null, true);
 
         return row != null ? row.value() : null;
     }
@@ -352,13 +359,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     /**
      * Unswaps an entry.
      *
-     * @param needVal If {@code false} then do not to deserialize value during unswap.
+     * @param row Already extracted cache data.
      * @param checkExpire If {@code true} checks for expiration, as result entry can be obsoleted or marked deleted.
      * @return Value.
      * @throws IgniteCheckedException If failed.
      * @throws GridCacheEntryRemovedException If entry was removed.
      */
-    @Nullable protected CacheDataRow unswap(boolean needVal, boolean checkExpire)
+    @Nullable protected CacheDataRow unswap(@Nullable CacheDataRow row, boolean checkExpire)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         boolean obsolete = false;
         boolean deferred = false;
@@ -368,7 +375,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             checkObsolete();
 
             if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) {
-                CacheDataRow read = cctx.offheap().read(this);
+                assert row == null || row.key() == key: "Unexpected row key";
+
+                CacheDataRow read = row == null ? cctx.offheap().read(this) : row;
 
                 flags |= IS_UNSWAPPED_MASK;
 
@@ -572,7 +581,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 if (val == null) {
                     if (isStartVersion()) {
-                        unswap(true, false);
+                        unswap(null, false);
 
                         val = this.val;
                     }
@@ -1322,7 +1331,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             // Load and remove from swap if it is new.
             if (isNew())
-                oldRow = unswap(retval, false);
+                oldRow = unswap(null, false);
 
             old = val;
 
@@ -2408,7 +2417,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     return null;
 
                 if (val == null && offheap)
-                    unswap(true, false);
+                    unswap(null, false);
 
                 if (checkExpired()) {
                     if (cctx.deferredDelete()) {
@@ -2645,7 +2654,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         boolean isNew = isStartVersion();
 
         if (isNew)
-            unswap(true, false);
+            unswap(null, false);
 
         CacheObject val = this.val;
 
@@ -2949,7 +2958,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 checkObsolete();
 
                 if (isStartVersion())
-                    unswap(true, false);
+                    unswap(null, false);
 
                 long expireTime = expireTimeExtras();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 54fcafa..bc486e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -375,31 +375,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
         final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN,
             ctx.name(), ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() {
                 @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException {
-                    final GridCloseableIterator iter0 = qry.executeScanQuery();
-
-                    final boolean needToConvert = transformer == null;
-
-                    return new GridCloseableIteratorAdapter<R>() {
-                        @Override protected R onNext() throws IgniteCheckedException {
-                            Object next = iter0.nextX();
-
-                            if (needToConvert) {
-                                Map.Entry<K, V> entry = (Map.Entry<K, V>)next;
-
-                                return (R)new CacheEntryImpl<>(entry.getKey(), entry.getValue());
-                            }
-
-                            return (R)next;
-                        }
-
-                        @Override protected boolean onHasNext() throws IgniteCheckedException {
-                            return iter0.hasNextX();
-                        }
-
-                        @Override protected void onClose() throws IgniteCheckedException {
-                            iter0.close();
-                        }
-                    };
+                    return qry.executeScanQuery();
                 }
             }, true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 7da3d4f..5566bb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -22,8 +22,8 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
@@ -53,7 +53,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public CacheDataRow unswap(boolean needVal, boolean checkExpire) throws IgniteCheckedException {
+    @Nullable @Override public CacheDataRow unswap(CacheDataRow row, boolean checkExpire) throws IgniteCheckedException {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 6e606bf..ce728b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -30,9 +30,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvcc;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.F;
@@ -443,7 +443,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public CacheDataRow unswap(boolean needVal, boolean checkExpire) {
+    @Nullable @Override public CacheDataRow unswap(CacheDataRow row, boolean checkExpire) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java
new file mode 100644
index 0000000..4787464
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryEntry.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import javax.cache.Cache;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.internal.processors.cache.CacheEntryImplEx;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+final class CacheQueryEntry<K,V> extends IgniteBiTuple<K,V> implements Cache.Entry<K,V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public CacheQueryEntry() {
+        // No-op.
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     */
+    CacheQueryEntry(@Nullable K key, @Nullable V val) {
+        super(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T unwrap(Class<T> cls) {
+        if (cls != null && cls.isAssignableFrom(getClass()))
+            return cls.cast(this);
+
+        if (cls.isAssignableFrom(CacheEntryImpl.class))
+            return (T)new CacheEntryImpl<>(getKey(), getValue());
+
+        if (cls.isAssignableFrom(CacheEntry.class))
+            return (T)new CacheEntryImplEx<>(getKey(), getValue(), null);
+
+        throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 7f859a2..b860f02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -632,7 +631,20 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
                 if (locIter != null && locIter.hasNextX())
                     cur = locIter.nextX();
 
-                return cur != null || (cur = fut.next()) != null;
+                return cur != null || (cur = convert(fut.next())) != null;
+            }
+
+            /**
+             * @param obj Entry to convert.
+             * @return Cache entry
+             */
+            private Object convert(Object obj) {
+                if(qry.transform() != null)
+                    return obj;
+
+                Map.Entry e = (Map.Entry)obj;
+
+                return e == null ? null : new CacheQueryEntry(e.getKey(), e.getValue());
             }
 
             @Override protected void onClose() throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 023c03c..c4eae8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
@@ -517,7 +518,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     @Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException {
         assert type == SCAN : "Wrong processing of qyery: " + type;
 
-        Collection<ClusterNode> nodes = nodes();
+        // Affinity nodes snapshot.
+        Collection<ClusterNode> nodes = new ArrayList<>(nodes());
 
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -537,13 +539,15 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
 
         final GridCacheQueryManager qryMgr = cctx.queries();
 
-        if (part != null && !cctx.isLocal())
-            return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
-        else {
-            boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
+        boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId());
 
-            return loc ? qryMgr.scanQueryLocal(this, true) : qryMgr.scanQueryDistributed(this, nodes);
-        }
+        if (loc)
+            return qryMgr.scanQueryLocal(this, true);
+
+        if (part != null)
+            return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx);
+        else
+            return qryMgr.scanQueryDistributed(this, nodes);
     }
 
     /**
@@ -621,12 +625,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
     /**
      * Wrapper for queries with fallback.
      */
-    private static class ScanQueryFallbackClosableIterator extends GridCloseableIteratorAdapter<Map.Entry> {
+    private static class ScanQueryFallbackClosableIterator extends GridCloseableIteratorAdapter {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** Query future. */
-        private volatile T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> tuple;
+        private volatile T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> tuple;
 
         /** Backups. */
         private volatile Queue<ClusterNode> nodes;
@@ -653,7 +657,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         private boolean firstItemReturned;
 
         /** */
-        private Map.Entry cur;
+        private Object cur;
 
         /**
          * @param part Partition.
@@ -726,7 +730,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                 }
             }
             else {
-                final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null);
+                final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, qry.transform, null);
 
                 GridCacheQueryFutureAdapter fut =
                     (GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean, Collections.singleton(node));
@@ -736,13 +740,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         }
 
         /** {@inheritDoc} */
-        @Override protected Map.Entry onNext() throws IgniteCheckedException {
+        @Override protected Object onNext() throws IgniteCheckedException {
             if (!onHasNext())
                 throw new NoSuchElementException();
 
             assert cur != null;
 
-            Map.Entry e = cur;
+            Object e = cur;
 
             cur = null;
 
@@ -755,9 +759,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                 if (cur != null)
                     return true;
 
-                T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> t = tuple;
+                T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> t = tuple;
 
-                GridCloseableIterator<Map.Entry> iter = t.get1();
+                GridCloseableIterator<Object> iter = t.get1();
 
                 if (iter != null) {
                     boolean hasNext = iter.hasNext();
@@ -773,14 +777,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
                     assert fut != null;
 
                     if (firstItemReturned)
-                        return (cur = (Map.Entry)fut.next()) != null;
+                        return (cur = convert(fut.next())) != null;
 
                     try {
                         fut.awaitFirstPage();
 
                         firstItemReturned = true;
 
-                        return (cur = (Map.Entry)fut.next()) != null;
+                        return (cur = convert(fut.next())) != null;
                     }
                     catch (IgniteClientDisconnectedCheckedException e) {
                         throw CU.convertToCacheException(e);
@@ -793,6 +797,19 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         }
 
         /**
+         * @param obj Entry to convert.
+         * @return Cache entry
+         */
+        private Object convert(Object obj) {
+            if(qry.transform() != null)
+                return obj;
+
+            Map.Entry e = (Map.Entry)obj;
+
+            return e == null ? null : new CacheQueryEntry(e.getKey(), e.getValue());
+        }
+
+        /**
          * @param e Exception for query run.
          */
         private void retryIfPossible(IgniteCheckedException e) {
@@ -847,7 +864,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
         @Override protected void onClose() throws IgniteCheckedException {
             super.onClose();
 
-            T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> t = tuple;
+            T2<GridCloseableIterator<Object>, GridCacheQueryFutureAdapter> t = tuple;
 
             if (t != null && t.get1() != null)
                 t.get1().close();

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 3e772cd..3e27720 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -40,10 +40,10 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import javax.cache.Cache;
-import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.QueryIndexType;
 import org.apache.ignite.cache.query.QueryMetrics;
@@ -63,7 +63,9 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheInternal;
@@ -71,10 +73,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
 import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate;
@@ -821,22 +823,22 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @throws IgniteCheckedException If failed to get iterator.
      */
     @SuppressWarnings({"unchecked"})
-    private GridCloseableIterator<IgniteBiTuple<K, V>> scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode)
+    private GridCloseableIterator scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode)
         throws IgniteCheckedException {
         final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
 
         try {
             injectResources(keyValFilter);
 
-            Integer part = qry.partition();
-
-            if (cctx.isLocal())
-                part = null;
+            Integer part = cctx.isLocal() ? null : qry.partition();
 
             if (part != null && (part < 0 || part >= cctx.affinity().partitions()))
-                return new GridEmptyCloseableIterator<>();
-
-            final ExpiryPolicy plc = cctx.expiry();
+                return new GridEmptyCloseableIterator() {
+                    @Override public void close() throws IgniteCheckedException {
+                        closeScanFilter(keyValFilter);
+                        super.close();
+                    }
+                };
 
             AffinityTopologyVersion topVer = GridQueryProcessor.getRequestAffinityTopologyVersion();
 
@@ -858,13 +860,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
                         "Partition can not be reserved");
 
-                if (locPart0.state() != OWNING) {
-                    locPart0.release();
-
-                    throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
-                        "Partition can not be reserved");
-                }
-
                 locPart = locPart0;
 
                 it = cctx.offheap().cachePartitionIterator(cctx.cacheId(), part);
@@ -872,19 +867,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             else {
                 locPart = null;
 
+                // TODO shouldn't we reserve all involved partitions?
                 it = cctx.offheap().cacheIterator(cctx.cacheId(), true, backups, topVer);
             }
 
-            return new PeekValueExpiryAwareIterator(it, plc, topVer, keyValFilter, qry.keepBinary(), locNode) {
-                @Override protected void onClose() {
-                    super.onClose();
-
-                    if (locPart != null)
-                        locPart.release();
-
-                    closeScanFilter(keyValFilter);
-                }
-            };
+            return new ScanQueryIterator(it, qry, topVer, locPart, keyValFilter, locNode, cctx, log);
         }
         catch (IgniteCheckedException | RuntimeException e) {
             closeScanFilter(keyValFilter);
@@ -1189,9 +1176,16 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                 final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
-                while (!Thread.currentThread().isInterrupted() && iter.hasNext()) {
+                CacheObjectContext objCtx = cctx.cacheObjectContext();
+
+                while (!Thread.currentThread().isInterrupted()) {
                     long start = statsEnabled ? System.nanoTime() : 0L;
 
+                    // Need to call it after gathering start time because
+                    // actual row extracting may happen inside this method.
+                    if(!iter.hasNext())
+                        break;
+
                     IgniteBiTuple<K, V> row = iter.next();
 
                     // Query is cancelled.
@@ -1249,8 +1243,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     V val0 = null;
 
                     if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
-                        key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
-                        val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
+                        key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false);
+                        val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false);
 
                         switch (type) {
                             case SQL:
@@ -1320,9 +1314,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
                     if (rdc != null || trans != null) {
                         if (key0 == null)
-                            key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary());
+                            key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, qry.keepBinary(), false);
                         if (val0 == null)
-                            val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary());
+                            val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, qry.keepBinary(), false);
 
                         Cache.Entry<K, V> entry = new CacheEntryImpl(key0, val0);
 
@@ -1422,22 +1416,24 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * Process local scan query.
      *
      * @param qry Query.
-     * @param updStatisticsIfNeeded Update statistics flag.
+     * @param updateStatistics Update statistics flag.
      */
     @SuppressWarnings({"unchecked", "serial"})
     protected GridCloseableIterator scanQueryLocal(final GridCacheQueryAdapter qry,
-        final boolean updStatisticsIfNeeded) throws IgniteCheckedException {
+        boolean updateStatistics) throws IgniteCheckedException {
         if (!enterBusy())
             throw new IllegalStateException("Failed to process query request (grid is stopping).");
 
         final boolean statsEnabled = cctx.config().isStatisticsEnabled();
 
-        boolean needUpdStatistics = updStatisticsIfNeeded && statsEnabled;
+        updateStatistics &= statsEnabled;
 
         long startTime = U.currentTimeMillis();
 
         final String namex = cctx.name();
 
+        final IgniteBiPredicate<K, V> scanFilter = qry.scanFilter();
+
         try {
             assert qry.type() == SCAN;
 
@@ -1445,7 +1441,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 log.debug("Running local SCAN query: " + qry);
 
             final String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
-            final IgniteBiPredicate filter = qry.scanFilter();
             final ClusterNode locNode = cctx.localNode();
             final UUID subjId = qry.subjectId();
 
@@ -1458,80 +1453,23 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     namex,
                     null,
                     null,
-                    filter,
+                    scanFilter,
                     null,
                     null,
                     subjId,
                     taskName));
             }
 
-            final GridCloseableIterator<IgniteBiTuple<K, V>> iter = scanIterator(qry, true);
+            GridCloseableIterator it = scanIterator(qry, true);
 
-            if (updStatisticsIfNeeded)
-                needUpdStatistics = false;
-
-            final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
-            return new GridCloseableIteratorAdapter<Object>() {
-                @Override protected Object onNext() throws IgniteCheckedException {
-                    long start = statsEnabled ? System.nanoTime() : 0L;
+            updateStatistics = false;
 
-                    IgniteBiTuple<K, V> next = iter.nextX();
-
-                    if (statsEnabled) {
-                        CacheMetricsImpl metrics = cctx.cache().metrics0();
-
-                        metrics.onRead(true);
-
-                        metrics.addGetTimeNanos(System.nanoTime() - start);
-                    }
-
-                    if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
-                        cctx.gridEvents().record(new CacheQueryReadEvent<>(
-                            cctx.localNode(),
-                            "Scan query entry read.",
-                            EVT_CACHE_QUERY_OBJECT_READ,
-                            CacheQueryType.SCAN.name(),
-                            namex,
-                            null,
-                            null,
-                            filter,
-                            null,
-                            null,
-                            subjId,
-                            taskName,
-                            next.getKey(),
-                            next.getValue(),
-                            null,
-                            null));
-                    }
-
-                    IgniteClosure transform = qry.transform();
-
-                    if (transform == null)
-                        return next;
-
-                    Cache.Entry<K, V> entry;
-
-                    if (qry.keepBinary())
-                        entry = cctx.cache().keepBinary().getEntry(next.getKey());
-                    else
-                        entry = cctx.cache().getEntry(next.getKey());
-
-                    return transform.apply(entry);
-                }
-
-                @Override protected boolean onHasNext() throws IgniteCheckedException {
-                    return iter.hasNextX();
-                }
-
-                @Override protected void onClose() throws IgniteCheckedException {
-                    iter.close();
-                }
-            };
+            return it;
         }
         catch (Exception e) {
-            if (needUpdStatistics)
+            closeScanFilter(scanFilter);
+
+            if (updateStatistics)
                 cctx.queries().collectMetrics(GridCacheQueryType.SCAN, namex, startTime,
                     U.currentTimeMillis() - startTime, true);
 
@@ -2047,8 +1985,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         private static final long serialVersionUID = 0L;
 
         /**
-         * Number of fields to report when no fields defined.
-         * Includes _key and _val columns.
+         * Number of fields to report when no fields defined. Includes _key and _val columns.
          */
         private static final int NO_FIELDS_COLUMNS_COUNT = 2;
 
@@ -2862,14 +2799,68 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
-     *
+     * The map prevents put to the map in case the specified request has been removed previously.
      */
-    private class PeekValueExpiryAwareIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
+    private class RequestFutureMap extends LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, V>>> {
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** Count of canceled keys */
+        private static final int CANCELED_COUNT = 128;
+
+        /**
+         * The ID of the canceled request is stored to the set in case remove(reqId) is called before put(reqId,
+         * future).
+         */
+        private Set<Long> canceled;
+
+        /** {@inheritDoc} */
+        @Override public GridFutureAdapter<QueryResult<K, V>> remove(Object key) {
+            if (containsKey(key))
+                return super.remove(key);
+            else {
+                if (canceled == null) {
+                    canceled = Collections.newSetFromMap(
+                        new LinkedHashMap<Long, Boolean>() {
+                            @Override protected boolean removeEldestEntry(Map.Entry<Long, Boolean> eldest) {
+                                return size() > CANCELED_COUNT;
+                            }
+                        });
+                }
+
+                canceled.add((Long)key);
+
+                return null;
+            }
+        }
+
+        /**
+         * @return true if the key is canceled
+         */
+        boolean isCanceled(Long key) {
+            return canceled != null && canceled.contains(key);
+        }
+    }
+
+    /** */
+    private static final class ScanQueryIterator<K, V> extends GridCloseableIteratorAdapter<Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final GridDhtCacheAdapter dht;
+
+        /** */
+        private final GridDhtLocalPartition locPart;
+
         /** */
-        private final ExpiryPolicy plc;
+        private final IgniteBiPredicate<K, V> scanFilter;
+
+        /** */
+        private final boolean statsEnabled;
+
+        /** */
+        private final GridIterator<CacheDataRow> it;
 
         /** */
         private final GridCacheAdapter cache;
@@ -2878,73 +2869,94 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         private final AffinityTopologyVersion topVer;
 
         /** */
-        private final GridDhtCacheAdapter dht;
+        private final boolean keepBinary;
 
         /** */
-        private final IgniteBiPredicate<K, V> keyValFilter;
+        private final boolean readEvt;
 
         /** */
-        private boolean locNode;
+        private final String cacheName;
 
         /** */
-        private final boolean keepBinary;
+        private final UUID subjId;
 
         /** */
-        private IgniteBiTuple<K, V> next;
+        private final String taskName;
 
         /** */
-        private IgniteCacheExpiryPolicy expiryPlc;
+        private final IgniteClosure transform;
+
+        /** */
+        private final CacheObjectContext objCtx;
+
+        /** */
+        private final GridCacheContext cctx;
+
+        /** */
+        private final IgniteLogger log;
 
         /** */
-        private GridIterator<CacheDataRow> it;
+        private Object next;
 
-        /** Need advance. */
+        /** */
         private boolean needAdvance;
 
+        /** */
+        private IgniteCacheExpiryPolicy expiryPlc;
+
         /**
          * @param it Iterator.
-         * @param plc Expiry policy.
+         * @param qry Query.
          * @param topVer Topology version.
-         * @param keyValFilter Key-value filter.
-         * @param keepBinary Keep binary flag from the query.
-         * @param locNode Local node.
+         * @param locPart Local partition.
+         * @param scanFilter Scan filter.
+         * @param locNode Local node flag.
+         * @param cctx Cache context.
+         * @param log Logger.
          */
-        private PeekValueExpiryAwareIterator(
+        ScanQueryIterator(
             GridIterator<CacheDataRow> it,
-            ExpiryPolicy plc,
+            GridCacheQueryAdapter qry,
             AffinityTopologyVersion topVer,
-            IgniteBiPredicate<K, V> keyValFilter,
-            boolean keepBinary,
-            boolean locNode
-        ) {
+            GridDhtLocalPartition locPart,
+            IgniteBiPredicate<K, V> scanFilter,
+            boolean locNode,
+            GridCacheContext cctx,
+            IgniteLogger log) {
             this.it = it;
-            this.plc = plc;
             this.topVer = topVer;
-            this.keyValFilter = keyValFilter;
-            this.locNode = locNode;
+            this.locPart = locPart;
+            this.scanFilter = scanFilter;
+            this.cctx = cctx;
+            this.log = log;
 
-            dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
-            cache = dht != null ? dht : cctx.cache();
+            statsEnabled = locNode && cctx.config().isStatisticsEnabled();
 
-            this.keepBinary = keepBinary;
-            expiryPlc = cctx.cache().expiryPolicy(plc);
+            readEvt = locNode && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
-            needAdvance = true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean onHasNext() {
-            if (needAdvance) {
-                advance();
-
-                needAdvance = false;
+            if(readEvt){
+                taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash());
+                subjId = qry.subjectId();
+            }
+            else {
+                taskName = null;
+                subjId = null;
             }
 
-            return next != null;
+            // keep binary for remote scans if possible
+            keepBinary = (!locNode && scanFilter == null) || qry.keepBinary();
+            transform = qry.transform();
+            dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
+            cache = dht != null ? dht : cctx.cache();
+            objCtx = cctx.cacheObjectContext();
+            cacheName = cctx.name();
+
+            needAdvance = true;
+            expiryPlc = this.cctx.cache().expiryPolicy(null);
         }
 
         /** {@inheritDoc} */
-        @Override public IgniteBiTuple<K, V> onNext() {
+        @Override protected Object onNext() {
             if (needAdvance)
                 advance();
             else
@@ -2957,26 +2969,64 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         }
 
         /** {@inheritDoc} */
+        @Override protected boolean onHasNext() {
+            if (needAdvance) {
+                advance();
+
+                needAdvance = false;
+            }
+
+            return next != null;
+        }
+
+        /** {@inheritDoc} */
         @Override protected void onClose() {
-            sendTtlUpdate();
+            if (expiryPlc != null && dht != null) {
+                dht.sendTtlUpdateRequest(expiryPlc);
+
+                expiryPlc = null;
+            }
+
+            if (locPart != null)
+                locPart.release();
+
+            closeScanFilter(scanFilter);
         }
 
         /**
          * Moves the iterator to the next cache entry.
          */
         private void advance() {
-            IgniteBiTuple<K, V> next0 = null;
+            long start = statsEnabled ? System.nanoTime() : 0L;
+
+            Object next = null;
 
             while (it.hasNext()) {
                 CacheDataRow row = it.next();
 
                 KeyCacheObject key = row.key();
-
                 CacheObject val;
 
                 if (expiryPlc != null) {
                     try {
-                        val = value(key);
+                        CacheDataRow tmp = row;
+
+                        while (true) {
+                            try {
+                                GridCacheEntryEx entry = cache.entryEx(key);
+
+                                entry.unswap(tmp);
+
+                                val = entry.peek(true, true, topVer, expiryPlc);
+
+                                cctx.evicts().touch(entry, topVer);
+
+                                break;
+                            }
+                            catch (GridCacheEntryRemovedException ignore) {
+                                tmp = null;
+                            }
+                        }
                     }
                     catch (IgniteCheckedException e) {
                         if (log.isDebugEnabled())
@@ -2985,126 +3035,58 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                         val = null;
                     }
 
-                    if (dht != null && expiryPlc.readyToFlush(100)) {
+                    if (dht != null && expiryPlc.readyToFlush(100))
                         dht.sendTtlUpdateRequest(expiryPlc);
-
-                        expiryPlc = cctx.cache().expiryPolicy(plc);
-                    }
                 }
                 else
                     val = row.value();
 
                 if (val != null) {
-                    boolean keepBinary0 = !locNode || keepBinary;
+                    K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key, keepBinary, false);
+                    V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val, keepBinary, false);
 
-                    next0 = F.t(
-                        (K)cctx.unwrapBinaryIfNeeded(key, keepBinary0),
-                        (V)cctx.unwrapBinaryIfNeeded(val, keepBinary0));
+                    if (statsEnabled) {
+                        CacheMetricsImpl metrics = cctx.cache().metrics0();
 
-                    boolean passPred = true;
+                        metrics.onRead(true);
 
-                    if (keyValFilter != null) {
-                        Object key0 = next0.getKey();
-                        Object val0 = next0.getValue();
+                        metrics.addGetTimeNanos(System.nanoTime() - start);
+                    }
 
-                        if (keepBinary0 && !keepBinary) {
-                            key0 = (K)cctx.unwrapBinaryIfNeeded(key0, keepBinary);
-                            val0 = (V)cctx.unwrapBinaryIfNeeded(val0, keepBinary);
+                    if (scanFilter == null || scanFilter.apply(key0, val0)) {
+                        if (readEvt && cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ)) {
+                            cctx.gridEvents().record(new CacheQueryReadEvent<>(
+                                cctx.localNode(),
+                                "Scan query entry read.",
+                                EVT_CACHE_QUERY_OBJECT_READ,
+                                CacheQueryType.SCAN.name(),
+                                cacheName,
+                                null,
+                                null,
+                                scanFilter,
+                                null,
+                                null,
+                                subjId,
+                                taskName,
+                                key0,
+                                val0,
+                                null,
+                                null));
                         }
 
-                        passPred = keyValFilter.apply((K)key0, (V)val0);
-                    }
+                        next = transform == null ? new CacheQueryEntry<>(key0, val0)
+                            : transform.apply(new CacheQueryEntry<>(key0, val0));
 
-                    if (passPred)
                         break;
-                    else
-                        next0 = null;
+                    }
                 }
             }
 
-            next = next0;
-
-            if (next == null)
-                sendTtlUpdate();
-        }
-
-        /**
-         * Sends TTL update.
-         */
-        private void sendTtlUpdate() {
-            if (dht != null && expiryPlc != null) {
+            if ((this.next = next) == null && expiryPlc != null && dht != null) {
                 dht.sendTtlUpdateRequest(expiryPlc);
 
                 expiryPlc = null;
             }
         }
-
-        /**
-         * @param key Key.
-         * @return Value.
-         * @throws IgniteCheckedException If failed to peek value.
-         */
-        private CacheObject value(KeyCacheObject key) throws IgniteCheckedException {
-            while (true) {
-                try {
-                    GridCacheEntryEx entry = cache.entryEx(key);
-
-                    entry.unswap();
-
-                    CacheObject cacheObj = entry.peek(true, true, topVer, expiryPlc);
-
-                    cctx.evicts().touch(entry, topVer);
-
-                    return cacheObj;
-                }
-                catch (GridCacheEntryRemovedException ignore) {
-                    // No-op.
-                }
-            }
-        }
-    }
-
-    /**
-     * The map prevents put to the map in case the specified request has been removed previously.
-     */
-    private class RequestFutureMap extends LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, V>>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Count of canceled keys */
-        private static final int CANCELED_COUNT = 128;
-
-        /**
-         * The ID of the canceled request is stored to the set in case
-         * remove(reqId) is called before put(reqId, future).
-         */
-        private Set<Long> canceled;
-
-        /** {@inheritDoc} */
-        @Override public GridFutureAdapter<QueryResult<K, V>> remove(Object key) {
-            if (containsKey(key))
-                return super.remove(key);
-            else {
-                if (canceled == null) {
-                    canceled = Collections.newSetFromMap(
-                        new LinkedHashMap<Long, Boolean>() {
-                            @Override protected boolean removeEldestEntry(Map.Entry<Long, Boolean> eldest) {
-                                return size() > CANCELED_COUNT;
-                            }
-                        });
-                }
-
-                canceled.add((Long)key);
-
-                return null;
-            }
-        }
-
-        /**
-         * @return true if the key is canceled
-         */
-        boolean isCanceled(Long key) {
-            return canceled != null && canceled.contains(key);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 70711e5..17be90f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cacheobject;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.UUID;
@@ -40,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -341,166 +339,4 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
         return false;
     }
 
-    /**
-     * Wraps key provided by user, must be serialized before stored in cache.
-     */
-    private static class UserKeyCacheObjectImpl extends KeyCacheObjectImpl {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         *
-         */
-        public UserKeyCacheObjectImpl() {
-            //No-op.
-        }
-
-        /**
-         * @param key Key.
-         * @param part Partition.
-         */
-        UserKeyCacheObjectImpl(Object key, int part) {
-            super(key, null, part);
-        }
-
-        /**
-         * @param key Key.
-         * @param valBytes Marshalled key.
-         * @param part Partition.
-         */
-        UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) {
-            super(key, valBytes, part);
-        }
-
-        /** {@inheritDoc} */
-        @Override public KeyCacheObject copy(int part) {
-            if (this.partition() == part)
-                return this;
-
-            return new UserKeyCacheObjectImpl(val, valBytes, part);
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
-            try {
-                IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
-
-                if (!proc.immutable(val)) {
-                    if (valBytes == null)
-                        valBytes = proc.marshal(ctx, val);
-
-                    boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
-
-                    ClassLoader ldr = p2pEnabled ?
-                        IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader();
-
-                    Object val = proc.unmarshal(ctx, valBytes, ldr);
-
-                    KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
-
-                    key.partition(partition());
-
-                    return key;
-                }
-
-                KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
-
-                key.partition(partition());
-
-                return key;
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException("Failed to marshal object: " + val, e);
-            }
-        }
-    }
-
-    /**
-     * Wraps value provided by user, must be serialized before stored in cache.
-     */
-    private static class UserCacheObjectImpl extends CacheObjectImpl {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         *
-         */
-        public UserCacheObjectImpl() {
-            //No-op.
-        }
-
-        /**
-         * @param val Value.
-         * @param valBytes Value bytes.
-         */
-        public UserCacheObjectImpl(Object val, byte[] valBytes) {
-            super(val, valBytes);
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
-            return super.value(ctx, false); // Do not need copy since user value is not in cache.
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
-            try {
-                IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
-
-                if (valBytes == null)
-                    valBytes = proc.marshal(ctx, val);
-
-                if (ctx.storeValue()) {
-                    boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
-
-                    ClassLoader ldr = p2pEnabled ?
-                        IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader();
-
-                    Object val = this.val != null && proc.immutable(this.val) ? this.val :
-                        proc.unmarshal(ctx, valBytes, ldr);
-
-                    return new CacheObjectImpl(val, valBytes);
-                }
-
-                return new CacheObjectImpl(null, valBytes);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException("Failed to marshal object: " + val, e);
-            }
-        }
-    }
-
-    /**
-     * Wraps value provided by user, must be copied before stored in cache.
-     */
-    private static class UserCacheObjectByteArrayImpl extends CacheObjectByteArrayImpl {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         *
-         */
-        public UserCacheObjectByteArrayImpl() {
-            // No-op.
-        }
-
-        /**
-         * @param val Value.
-         */
-        public UserCacheObjectByteArrayImpl(byte[] val) {
-            super(val);
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
-            return super.value(ctx, false); // Do not need copy since user value is not in cache.
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
-            byte[] valCpy = Arrays.copyOf(val, val.length);
-
-            return new CacheObjectByteArrayImpl(valCpy);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java
new file mode 100644
index 0000000..aa4d5f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectByteArrayImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cacheobject;
+
+import java.util.Arrays;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wraps value provided by user, must be copied before stored in cache.
+ */
+public class UserCacheObjectByteArrayImpl extends CacheObjectByteArrayImpl {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     *
+     */
+    public UserCacheObjectByteArrayImpl() {
+        // No-op.
+    }
+
+    /**
+     * @param val Value.
+     */
+    public UserCacheObjectByteArrayImpl(byte[] val) {
+        super(val);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
+        return super.value(ctx, false); // Do not need copy since user value is not in cache.
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
+        byte[] valCpy = Arrays.copyOf(val, val.length);
+
+        return new CacheObjectByteArrayImpl(valCpy);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java
new file mode 100644
index 0000000..241c12b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserCacheObjectImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cacheobject;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wraps value provided by user, must be serialized before stored in cache.
+ */
+public class UserCacheObjectImpl extends CacheObjectImpl {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     *
+     */
+    public UserCacheObjectImpl() {
+        //No-op.
+    }
+
+    /**
+     * @param val Value.
+     * @param valBytes Value bytes.
+     */
+    public UserCacheObjectImpl(Object val, byte[] valBytes) {
+        super(val, valBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
+        return super.value(ctx, false); // Do not need copy since user value is not in cache.
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
+        try {
+            IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
+
+            if (valBytes == null)
+                valBytes = proc.marshal(ctx, val);
+
+            if (ctx.storeValue()) {
+                boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
+
+                ClassLoader ldr = p2pEnabled ?
+                    IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader();
+
+                Object val = this.val != null && proc.immutable(this.val) ? this.val :
+                    proc.unmarshal(ctx, valBytes, ldr);
+
+                return new CacheObjectImpl(val, valBytes);
+            }
+
+            return new CacheObjectImpl(null, valBytes);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to marshal object: " + val, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java
new file mode 100644
index 0000000..de57667
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/UserKeyCacheObjectImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cacheobject;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Wraps key provided by user, must be serialized before stored in cache.
+ */
+public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     *
+     */
+    public UserKeyCacheObjectImpl() {
+        //No-op.
+    }
+
+    /**
+     * @param key Key.
+     * @param part Partition.
+     */
+    UserKeyCacheObjectImpl(Object key, int part) {
+        super(key, null, part);
+    }
+
+    /**
+     * @param key Key.
+     * @param valBytes Marshalled key.
+     * @param part Partition.
+     */
+    UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) {
+        super(key, valBytes, part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject copy(int part) {
+        if (this.partition() == part)
+            return this;
+
+        return new UserKeyCacheObjectImpl(val, valBytes, part);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
+        try {
+            IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
+
+            if (!proc.immutable(val)) {
+                if (valBytes == null)
+                    valBytes = proc.marshal(ctx, val);
+
+                boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
+
+                ClassLoader ldr = p2pEnabled ?
+                    IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader();
+
+                Object val = proc.unmarshal(ctx, valBytes, ldr);
+
+                KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
+
+                key.partition(partition());
+
+                return key;
+            }
+
+            KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
+
+            key.partition(partition());
+
+            return key;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to marshal object: " + val, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1c9d80a5/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 46fcfea..1d8720c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -26,7 +26,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -61,7 +60,6 @@ import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
-import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -1309,7 +1307,9 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
             return cache.context().itHolder().iterator(iter,
                 new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object, Object>>() {
                     @Override protected Cache.Entry<Object, Object> convert(Map.Entry<Object, Object> e) {
-                        return new CacheEntryImpl<>(e.getKey(), e.getValue());
+                        // Actually Scan Query returns Iterator<CacheQueryEntry> by default,
+                        // CacheQueryEntry implements both Map.Entry and Cache.Entry interfaces.
+                        return (Cache.Entry<Object, Object>)e;
                     }
 
                     @Override protected void remove(Cache.Entry<Object, Object> item) {


Mime
View raw message