ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/15] ignite git commit: ignite-db-x-10884
Date Wed, 04 May 2016 14:38:50 GMT
ignite-db-x-10884


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/43de3d67
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/43de3d67
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/43de3d67

Branch: refs/heads/ignite-db-x-10884
Commit: 43de3d67dbef5f503289728a6c6026c3106a39d3
Parents: 891d706
Author: sboikov <sboikov@gridgain.com>
Authored: Wed May 4 11:02:11 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed May 4 17:21:59 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   11 +-
 .../processors/cache/GridCacheContext.java      |    9 +-
 .../processors/cache/GridCacheProcessor.java    |    5 -
 .../cache/IgniteCacheOffheapManager.java        |  492 ++++++---
 .../distributed/dht/GridDhtLocalPartition.java  |    3 +
 .../dht/preloader/GridDhtPartitionSupplier.java |   91 +-
 .../cache/query/GridCacheQueryManager.java      |  261 ++++-
 .../cache/query/GridCacheQueryResponse.java     |   49 +-
 .../processors/query/GridQueryIndexing.java     |   10 +-
 .../processors/query/GridQueryProcessor.java    |    4 +-
 .../cache/database/IgniteDbPutGetTest.java      |  121 ---
 .../database/IgniteDbMultiNodePutGetTest.java   |   33 +
 .../database/IgniteDbPutGetAbstractTest.java    | 1017 ++++++++++++++++++
 .../database/IgniteDbSingleNodePutGetTest.java  |   33 +
 .../loadtests/hashmap/GridCacheTestContext.java |    1 -
 .../testsuites/IgniteDatabaseTestSuite.java     |   40 +
 .../processors/query/h2/IgniteH2Indexing.java   |   29 +-
 .../IgniteDbMultiNodePutGetRestartSelfTest.java |   20 +-
 .../IgniteDbMultiNodePutGetSelfTest.java        |   25 -
 ...IgniteDbMultiNodeWithIndexingPutGetTest.java |   28 +
 .../IgniteDbSingleNodePutGetSelfTest.java       |  995 -----------------
 ...gniteDbSingleNodeWithIndexingPutGetTest.java |   28 +
 22 files changed, 1871 insertions(+), 1434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9916f5c..7bba295 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3692,20 +3692,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     @Deprecated
     @Nullable public Cache.Entry<K, V> randomEntry() {
-        GridCloseableIterator<CacheDataRow> it;
+        GridCloseableIterator<Cache.Entry<K, V>> it;
 
         try {
-            it = ctx.offheap().iterator();
+            it = ctx.offheap().entriesIterator(true, true, ctx.affinity().affinityTopologyVersion());
         }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
         }
 
-        if (it.hasNext()) {
-            CacheDataRow row = it.next();
-
-            return new CacheEntryImpl<>((K)row.key(), (V)row.value(), row.version());
-        }
+        if (it.hasNext())
+            return it.next();
 
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 54a4153..53fe3db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -153,9 +153,6 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** Continuous query manager. */
     private CacheContinuousQueryManager contQryMgr;
 
-    /** Swap manager. */
-    private GridCacheSwapManager swapMgr;
-
     /** Evictions manager. */
     private GridCacheEvictionManager evictMgr;
 
@@ -258,7 +255,6 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @param affNode {@code True} if local node is affinity node.
      * @param updatesAllowed Updates allowed flag.
      * @param evtMgr Cache event manager.
-     * @param swapMgr Cache swap manager.
      * @param storeMgr Store manager.
      * @param evictMgr Cache eviction manager.
      * @param qryMgr Cache query manager.
@@ -285,7 +281,6 @@ public class GridCacheContext<K, V> implements Externalizable {
          */
 
         GridCacheEventManager evtMgr,
-        GridCacheSwapManager swapMgr,
         CacheStoreManager storeMgr,
         GridCacheEvictionManager evictMgr,
         GridCacheQueryManager<K, V> qryMgr,
@@ -303,7 +298,6 @@ public class GridCacheContext<K, V> implements Externalizable {
         assert cacheCfg != null;
 
         assert evtMgr != null;
-        assert swapMgr != null;
         assert storeMgr != null;
         assert evictMgr != null;
         assert qryMgr != null;
@@ -328,7 +322,6 @@ public class GridCacheContext<K, V> implements Externalizable {
          * ===========================
          */
         this.evtMgr = add(evtMgr);
-        this.swapMgr = add(swapMgr);
         this.storeMgr = add(storeMgr);
         this.evictMgr = add(evictMgr);
         this.qryMgr = add(qryMgr);
@@ -989,7 +982,7 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Swap manager.
      */
     public GridCacheSwapManager swap() {
-        return swapMgr;
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index a4b0c83..cf28113 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1308,8 +1308,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         GridCacheAffinityManager affMgr = new GridCacheAffinityManager();
         GridCacheEventManager evtMgr = new GridCacheEventManager();
-        GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL ||
-            !GridCacheUtils.isNearEnabled(cfg));
         GridCacheEvictionManager evictMgr = new GridCacheEvictionManager();
         GridCacheQueryManager qryMgr = queryManager(cfg);
         CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager();
@@ -1341,7 +1339,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
              * ===========================
              */
             evtMgr,
-            swapMgr,
             storeMgr,
             evictMgr,
             qryMgr,
@@ -1452,7 +1449,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
              * 7. GridCacheTtlManager.
              * ===============================================
              */
-            swapMgr = new GridCacheSwapManager(true);
             evictMgr = new GridCacheEvictionManager();
             evtMgr = new GridCacheEventManager();
             pluginMgr = new CachePluginManager(ctx, cfg);
@@ -1471,7 +1467,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                  * ===========================
                  */
                 evtMgr,
-                swapMgr,
                 storeMgr,
                 evictMgr,
                 qryMgr,

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index bb2ef3e..1f8e3cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import javax.cache.Cache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -40,13 +39,11 @@ import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusInnerIO
 import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusLeafIO;
 import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
 import org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
-import org.apache.ignite.internal.util.GridEmptyIterator;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.lang.GridIterator;
@@ -92,66 +89,54 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
-        IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database();
+        if (cctx.affinityNode()) {
+            IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database();
 
-        IgniteBiTuple<FullPageId, Boolean> page = dbMgr.meta().getOrAllocateForIndex(cctx.cacheId(), cctx.namexx());
+            IgniteBiTuple<FullPageId, Boolean> page = dbMgr.meta().getOrAllocateForIndex(cctx.cacheId(), cctx.namexx());
 
-        int cpus = Runtime.getRuntime().availableProcessors();
+            int cpus = Runtime.getRuntime().availableProcessors();
 
-        reuseList = new ReuseList(cctx.cacheId(), dbMgr.pageMemory(), cpus * 2, dbMgr.meta());
-        freeList = new FreeList(cctx, reuseList);
+            reuseList = new ReuseList(cctx.cacheId(), dbMgr.pageMemory(), cpus * 2, dbMgr.meta());
+            freeList = new FreeList(cctx, reuseList);
 
-        rowStore = new CacheDataRowStore(cctx, freeList);
+            rowStore = new CacheDataRowStore(cctx, freeList);
 
-        dataTree = new CacheDataTree(reuseList,
-            rowStore,
-            cctx,
-            dbMgr.pageMemory(),
-            page.get1(),
-            page.get2());
-    }
-
-    public ReuseList reuseList() {
-        return reuseList;
-    }
-
-    public FreeList freeList() {
-        return freeList;
+            dataTree = new CacheDataTree(reuseList,
+                rowStore,
+                cctx,
+                dbMgr.pageMemory(),
+                page.get1(),
+                page.get2());
+        }
     }
 
     /**
-     * {@inheritDoc}
+     * @return Reuse list.
      */
-    @Override
-    protected void onKernalStart0() throws IgniteCheckedException {
-        super.onKernalStart0();
+    public ReuseList reuseList() {
+        return reuseList;
     }
 
     /**
-     * TODO: GG-10884, used on only from initialValue.
+     * @return Free list.
      */
-    public boolean containsKey(KeyCacheObject key, int part) {
-        try {
-            return read(key, part) != null;
-        } catch (IgniteCheckedException e) {
-            U.error(log, "Failed to read value", e);
-
-            return false;
-        }
+    public FreeList freeList() {
+        return freeList;
     }
 
     /**
      * @param key  Key.
      * @param val  Value.
      * @param ver  Version.
+     * @param expireTime Expire time.
      * @param part Partition.
      * @throws IgniteCheckedException If failed.
      */
     public void update(KeyCacheObject key,
-                       CacheObject val,
-                       GridCacheVersion ver,
-                       long expireTime,
-                       int part) throws IgniteCheckedException {
+        CacheObject val,
+        GridCacheVersion ver,
+        long expireTime,
+        int part) throws IgniteCheckedException {
         if (indexingEnabled) {
             GridCacheQueryManager qryMgr = cctx.queries();
 
@@ -170,9 +155,13 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
 
     /**
      * @param key Key.
+     * @param prevVal Previous value.
+     * @param prevVer Previous version.
+     * @param part Partition.
      * @throws IgniteCheckedException If failed.
      */
-    public void remove(KeyCacheObject key, CacheObject prevVal, GridCacheVersion prevVer, int part) throws IgniteCheckedException {
+    public void remove(KeyCacheObject key, CacheObject prevVal, GridCacheVersion prevVer, int part)
+        throws IgniteCheckedException {
         if (indexingEnabled) {
             GridCacheQueryManager qryMgr = cctx.queries();
 
@@ -182,7 +171,7 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
                 return;
         }
 
-        DataRow dataRow = dataTree.remove(new KeySearchRow(key));
+        DataRow dataRow = dataTree.remove(new KeySearchRow(key, 0));
 
         if (dataRow != null) {
             assert dataRow.link != 0 : dataRow;
@@ -199,14 +188,24 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
      */
     @Nullable public IgniteBiTuple<CacheObject, GridCacheVersion> read(KeyCacheObject key, int part)
         throws IgniteCheckedException {
-        if (indexingEnabled)
-            return cctx.queries().read(key, part);
+        if (indexingEnabled) {
+            IgniteBiTuple<CacheObject, GridCacheVersion> t = cctx.queries().read(key, part);
 
-        DataRow dataRow = dataTree.findOne(new KeySearchRow(key));
+            if (t != null)
+                return t.get1() != null ? t : null;
+        }
+
+        DataRow dataRow = dataTree.findOne(new KeySearchRow(key, 0));
 
         return dataRow != null ? F.t(dataRow.val, dataRow.ver) : null;
     }
 
+    /**
+     * @param key Key.
+     * @param part Partition.
+     * @return Value.
+     * @throws IgniteCheckedException If failed.
+     */
     @Nullable CacheObject readValue(KeyCacheObject key, int part) throws IgniteCheckedException {
         IgniteBiTuple<CacheObject, GridCacheVersion> t = read(key, part);
 
@@ -214,6 +213,20 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * TODO: GG-10884, used on only from initialValue.
+     */
+    public boolean containsKey(KeyCacheObject key, int part) {
+        try {
+            return read(key, part) != null;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to read value", e);
+
+            return false;
+        }
+    }
+
+    /**
      * Clears offheap entries.
      *
      * @param readers {@code True} to clear readers.
@@ -223,13 +236,17 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
             clear(dataTree, readers);
 
         if (indexingEnabled) {
-            List<BPlusTree<?, ? extends CacheDataRow>> idxs = cctx.queries().pkIndexes();
+            BPlusTree<?, ? extends CacheDataRow> idx = cctx.queries().pkIndex();
 
-            for (BPlusTree<?, ? extends CacheDataRow> tree : idxs)
-                clear(tree, readers);
+            if (idx != null)
+                clear(idx, readers);
         }
     }
 
+    /**
+     * @param ldr Class loader.
+     * @return Number of undeployed entries.
+     */
     public int onUndeploy(ClassLoader ldr) {
         // TODO: GG-10884.
         return 0;
@@ -261,18 +278,53 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
                     GridCacheEntryEx entry = cctx.cache().entryEx(key);
 
                     entry.clear(obsoleteVer, readers);
-                } catch (GridDhtInvalidPartitionException ignore) {
+                }
+                catch (GridDhtInvalidPartitionException ignore) {
                     // Ignore.
-                } catch (IgniteCheckedException e) {
+                }
+                catch (IgniteCheckedException e) {
                     U.error(log, "Failed to clear cache entry: " + key, e);
                 }
             }
-        } catch (IgniteCheckedException e) {
+        }
+        catch (IgniteCheckedException e) {
             U.error(log, "Failed to clear cache entries.", e);
         }
     }
 
     /**
+     * @param part Partition.
+     * @return Number of entries in given partition.
+     */
+    public long entriesCount(int part) {
+        return 0;
+    }
+
+    /**
+     * @param primary Include primary node keys.
+     * @param backup Include backup node keys.
+     * @param topVer Topology version.
+     * @return Entries count.
+     * @throws IgniteCheckedException If failed.
+     */
+    public long entriesCount(boolean primary, boolean backup, AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
+        long cnt = 0;
+
+        if (dataTree != null)
+            cnt += entriesCount(dataTree, primary, backup, topVer);
+
+        if (indexingEnabled) {
+            BPlusTree<?, ? extends CacheDataRow> idx = cctx.queries().pkIndex();
+
+            if (idx != null)
+                cnt += entriesCount(idx, primary, backup, topVer);
+        }
+
+        return cnt;
+    }
+
+    /**
      * @param tree    Tree.
      * @param primary Include primary node keys.
      * @param backup  Include backup node keys.
@@ -281,9 +333,9 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
      * @throws IgniteCheckedException If failed.
      */
     private long entriesCount(BPlusTree<?, ? extends CacheDataRow> tree,
-                              boolean primary,
-                              boolean backup,
-                              AffinityTopologyVersion topVer) throws IgniteCheckedException {
+        boolean primary,
+        boolean backup,
+        AffinityTopologyVersion topVer) throws IgniteCheckedException {
         GridCursor<? extends CacheDataRow> cur = tree.find(null, null);
 
         ClusterNode locNode = cctx.localNode();
@@ -310,160 +362,187 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
         return cnt;
     }
 
-    public long entriesCount(int part) {
+    public long offHeapAllocatedSize() {
+        // TODO GG-10884.
         return 0;
     }
 
     /**
-     * @param primary Include primary node keys.
-     * @param backup Include backup node keys.
-     * @param topVer Topology version.
-     * @return Entries count.
+     * @param primary {@code True} if need return primary entries.
+     * @param backup {@code True} if need return backup entries.
+     * @param topVer Topology version to use.
+     * @return Entries iterator.
      * @throws IgniteCheckedException If failed.
      */
-    public long entriesCount(boolean primary, boolean backup, AffinityTopologyVersion topVer)
+    @SuppressWarnings("unchecked")
+    public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(final boolean primary,
+        final boolean backup,
+        final AffinityTopologyVersion topVer)
         throws IgniteCheckedException {
-        long cnt = 0;
-
-        if (dataTree != null)
-            cnt += entriesCount(dataTree, primary, backup, topVer);
-
-        if (indexingEnabled) {
-            List<BPlusTree<?, ? extends CacheDataRow>> idxs = cctx.queries().pkIndexes();
+        final GridCursor<CacheDataRow> cur = cursor();
 
-            for (BPlusTree<?, ? extends CacheDataRow> tree : idxs)
-                cnt += entriesCount(tree, primary, backup, topVer);
-        }
+        return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
+            /** */
+            private CacheEntryImplEx next;
 
-        return cnt;
-    }
+            @Override protected Cache.Entry<K, V> onNext() throws IgniteCheckedException {
+                CacheEntryImplEx ret = next;
 
-    public GridCloseableIterator<CacheDataRow> iterator() throws IgniteCheckedException {
-        final GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null);
+                next = null;
 
-        return new GridCloseableIteratorAdapter<CacheDataRow>() {
-            @Override protected CacheDataRow onNext() throws IgniteCheckedException {
-                return cur.get();
+                return ret;
             }
 
             @Override protected boolean onHasNext() throws IgniteCheckedException {
-                return cur.next();
-            }
-        };
-    }
-
-    public long offHeapAllocatedSize() {
-        // TODO GG-10884.
-        return 0;
-    }
-
-    public <K, V> GridIterator<IgniteBiTuple<K, V>> scanQueryIterator(@Nullable final IgniteBiPredicate<K, V> filter, final boolean keepBinary)
-        throws IgniteCheckedException {
-        final GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null);
-
-        return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
-            private IgniteBiTuple<K, V> next;
-
-            private void advance() throws IgniteCheckedException {
                 if (next != null)
-                    return;
+                    return true;
+
+                CacheDataRow nextRow = null;
 
-                CacheDataRow row = null;
+                while (cur.next()) {
+                    CacheDataRow row = cur.get();
 
-                if (filter != null) {
-                    while (cur.next()) {
-                        row = cur.get();
+                    boolean pass;
 
-                        K key = (K)cctx.unwrapBinaryIfNeeded(row.key(), keepBinary);
-                        V val = (V)cctx.unwrapBinaryIfNeeded(row.value(), keepBinary);
+                    if (primary && backup)
+                        pass = true;
+                    else if (primary)
+                        pass = cctx.affinity().primary(cctx.localNode(), row.partition(), topVer);
+                    else
+                        pass = cctx.affinity().backup(cctx.localNode(), row.partition(), topVer);
 
-                        if (!filter.apply(key, val))
-                            continue;
+                    if (pass) {
+                        nextRow = row;
 
                         break;
                     }
                 }
-                else {
-                    if (cur.next())
-                        row = cur.get();
+
+                if (nextRow != null) {
+                    next = new CacheEntryImplEx(nextRow.key(), nextRow.value(), nextRow.version());
+
+                    return true;
                 }
 
-                if (row != null)
-                    next = new IgniteBiTuple<>((K)row.key(), (V)row.value());
+                return false;
             }
+        };
+    }
+
+    /**
+     * @return Iterator.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridCloseableIterator<KeyCacheObject> keysIterator() throws IgniteCheckedException {
+        final GridCursor<CacheDataRow> cur = cursor();
+
+        return new GridCloseableIteratorAdapter<KeyCacheObject>() {
+            /** */
+            private KeyCacheObject next;
 
-            @Override protected IgniteBiTuple<K, V> onNext() throws IgniteCheckedException {
-                IgniteBiTuple<K, V> res = next;
+            @Override protected KeyCacheObject onNext() throws IgniteCheckedException {
+                KeyCacheObject ret = next;
 
                 next = null;
 
-                return res;
+                return ret;
             }
 
             @Override protected boolean onHasNext() throws IgniteCheckedException {
-                advance();
+                if (next != null)
+                    return true;
+
+                if (cur.next())
+                    next = cur.get().key();
 
                 return next != null;
             }
         };
     }
 
-    public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(boolean primary, boolean backup, AffinityTopologyVersion topVer)
-        throws IgniteCheckedException {
-        final GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null);
+    /**
+     * @param part Partition.
+     * @return Iterator.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridCloseableIterator<KeyCacheObject> keysIterator(final int part) throws IgniteCheckedException {
+        final GridCursor<CacheDataRow> cur = cursor();
 
-        return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
-            @Override protected Cache.Entry<K, V> onNext() throws IgniteCheckedException {
-                CacheDataRow row = cur.get();
+        return new GridCloseableIteratorAdapter<KeyCacheObject>() {
+            /** */
+            private KeyCacheObject next;
 
-                return new CacheEntryImplEx<>((K)row.key(), (V)row.value(), row.version());
+            @Override protected KeyCacheObject onNext() throws IgniteCheckedException {
+                KeyCacheObject res = next;
+
+                next = null;
+
+                return res;
             }
 
             @Override protected boolean onHasNext() throws IgniteCheckedException {
-                return cur.next();
-            }
-        };
-    }
+                if (next != null)
+                    return true;
 
-    public GridCloseableIterator<KeyCacheObject> keysIterator() throws IgniteCheckedException {
-        final GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null);
+                while (cur.next()) {
+                    CacheDataRow row = cur.get();
 
-        return new GridCloseableIteratorAdapter<KeyCacheObject>() {
-            @Override protected KeyCacheObject onNext() throws IgniteCheckedException {
-                CacheDataRow row = cur.get();
+                    if (row.partition() == part) {
+                        next = row.key();
 
-                return row.key();
-            }
+                        break;
+                    }
+                }
 
-            @Override protected boolean onHasNext() throws IgniteCheckedException {
-                return cur.next();
+                return next != null;
             }
         };
     }
 
-    public GridCloseableIterator<KeyCacheObject> keysIterator(final int part) throws IgniteCheckedException {
-        final GridCursor<? extends CacheDataRow> cur = dataTree.find(null, null);
+    public GridIterator<CacheDataRow> iterator(final boolean backups, final AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
+        final GridCursor<CacheDataRow> cur = cursor();
 
-        return new GridCloseableIteratorAdapter<KeyCacheObject>() {
-            private KeyCacheObject next;
+        return new GridCloseableIteratorAdapter<CacheDataRow>() {
+            /** */
+            private CacheDataRow next;
+
+            @Override protected CacheDataRow onNext() throws IgniteCheckedException {
+                CacheDataRow res = next;
+
+                next = null;
+
+                return res;
+            }
 
-            private void advance() throws IgniteCheckedException {
+            @Override protected boolean onHasNext() throws IgniteCheckedException {
                 if (next != null)
-                    return;
+                    return true;
 
                 while (cur.next()) {
                     CacheDataRow row = cur.get();
 
-                    if (row.partition() == part) {
-                        next = row.key();
+                    if (backups || cctx.affinity().primary(cctx.localNode(), row.partition(), topVer)) {
+                        next = row;
 
                         break;
                     }
                 }
+
+                return next != null;
             }
+        };
+    }
 
-            @Override protected KeyCacheObject onNext() throws IgniteCheckedException {
-                KeyCacheObject res = next;
+    public GridIterator<CacheDataRow> iterator(final int part) throws IgniteCheckedException {
+        final GridCursor<CacheDataRow> cur = cursor();
+
+        return new GridCloseableIteratorAdapter<CacheDataRow>() {
+            /** */
+            private CacheDataRow next;
+
+            @Override protected CacheDataRow onNext() throws IgniteCheckedException {
+                CacheDataRow res = next;
 
                 next = null;
 
@@ -471,7 +550,18 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
             }
 
             @Override protected boolean onHasNext() throws IgniteCheckedException {
-                advance();
+                if (next != null)
+                    return true;
+
+                while (cur.next()) {
+                    CacheDataRow row = cur.get();
+
+                    if (row.partition() == part) {
+                        next = row;
+
+                        break;
+                    }
+                }
 
                 return next != null;
             }
@@ -479,17 +569,79 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @return Cursor.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private GridCursor<CacheDataRow> cursor() throws IgniteCheckedException {
+        GridCursor<? extends CacheDataRow> cur1 = dataTree.find(null, null);
+
+        GridCursor<? extends CacheDataRow> cur2 = indexingEnabled ? cctx.queries().pkIndex().find(null, null) : null;
+
+        return cur2 != null ? new CompoundCursor(cur1, cur2) : cur1;
+    }
+
+    /**
+     *
+     */
+    static class CompoundCursor<T> implements GridCursor<T> {
+        /** */
+        private final GridCursor<T> c1;
+
+        /** */
+        private final GridCursor<T> c2;
+
+        /** */
+        private GridCursor<T> c;
+
+        /**
+         * @param c1 First cursor.
+         * @param c2 Second cursor.
+         */
+        public CompoundCursor(GridCursor<T> c1, GridCursor<T> c2) {
+            this.c1 = c1;
+            this.c2 = c2;
+
+            c = c1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() throws IgniteCheckedException {
+            if (c.next())
+                return true;
+
+            if (c == c1) {
+                c = c2;
+
+                return c.next();
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public T get() throws IgniteCheckedException {
+            return c.get();
+        }
+    }
+
+    /**
      *
      */
     static class KeySearchRow {
         /** */
         protected final KeyCacheObject key;
 
+        /** */
+        protected long link;
+
         /**
          * @param key Key.
+         * @param link Link.
          */
-        public KeySearchRow(KeyCacheObject key) {
+        public KeySearchRow(KeyCacheObject key, long link) {
             this.key = key;
+            this.link = link;
         }
 
         /** {@inheritDoc} */
@@ -511,9 +663,6 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
         /** */
         private int part;
 
-        /** */
-        private long link;
-
         /**
          * @param key Key.
          * @param val Value.
@@ -522,12 +671,11 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
          * @param link Link.
          */
         public DataRow(KeyCacheObject key, CacheObject val, GridCacheVersion ver, int part, long link) {
-            super(key);
+            super(key, link);
 
             this.val = val;
             this.ver = ver;
             this.part = part;
-            this.link = link;
         }
 
         /** {@inheritDoc} */
@@ -577,6 +725,7 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
         private final GridCacheContext cctx;
 
         /**
+         * @param reuseList Reuse list.
          * @param rowStore Row store.
          * @param cctx Context.
          * @param pageMem Page memory.
@@ -603,14 +752,16 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
         }
 
         /** {@inheritDoc} */
-        @Override protected int compare(BPlusIO<KeySearchRow> io, ByteBuffer buf, int idx, KeySearchRow row) throws IgniteCheckedException {
+        @Override protected int compare(BPlusIO<KeySearchRow> io, ByteBuffer buf, int idx, KeySearchRow row)
+            throws IgniteCheckedException {
             KeySearchRow row0 = io.getLookupRow(this, buf, idx);
 
             return compareKeys(row0.key, row.key);
         }
 
         /** {@inheritDoc} */
-        @Override protected DataRow getRow(BPlusIO<KeySearchRow> io, ByteBuffer buf, int idx) throws IgniteCheckedException {
+        @Override protected DataRow getRow(BPlusIO<KeySearchRow> io, ByteBuffer buf, int idx)
+            throws IgniteCheckedException {
             long link = ((RowLinkIO)io).getLink(buf, idx);
 
             return rowStore.dataRow(link);
@@ -652,14 +803,30 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
             super(cctx, freeList);
         }
 
+        /**
+         * @param link Link.
+         * @return Search row.
+         * @throws IgniteCheckedException If failed.
+         */
         public KeySearchRow keySearchRow(long link) throws IgniteCheckedException {
             return getRow(link, KeyRowClosure.INSTANCE);
         }
 
+        /**
+         * @param link Link.
+         * @return Data row.
+         * @throws IgniteCheckedException If failed.
+         */
         public DataRow dataRow(long link) throws IgniteCheckedException {
             return getRow(link, DataRowClosure.INSTANCE);
         }
 
+        /**
+         * @param link Link.
+         * @param c Row closure.
+         * @return Row.
+         * @throws IgniteCheckedException If failed.
+         */
         private <T> T getRow(long link, RowClosure<T> c) throws IgniteCheckedException {
             try (Page page = page(pageId(link))) {
                 ByteBuffer buf = page.getForRead();
@@ -695,6 +862,13 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
          *
          */
         private interface RowClosure<T> {
+            /**
+             * @param buf Buffer.
+             * @param link Link.
+             * @param cctx Context.
+             * @return Row.
+             * @throws IgniteCheckedException If failed.
+             */
             T create(ByteBuffer buf, long link, GridCacheContext cctx) throws IgniteCheckedException;
         }
 
@@ -706,10 +880,11 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
             static final KeyRowClosure INSTANCE = new KeyRowClosure();
 
             /** {@inheritDoc} */
-            @Override public KeySearchRow create(ByteBuffer buf, long link, GridCacheContext cctx) throws IgniteCheckedException {
+            @Override public KeySearchRow create(ByteBuffer buf, long link, GridCacheContext cctx)
+                throws IgniteCheckedException {
                 KeyCacheObject key = cctx.cacheObjects().toKeyCacheObject(cctx.cacheObjectContext(), buf);
 
-                return new KeySearchRow(key);
+                return new KeySearchRow(key, link);
             }
         }
 
@@ -721,7 +896,8 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
             static final DataRowClosure INSTANCE = new DataRowClosure();
 
             /** {@inheritDoc} */
-            @Override public DataRow create(ByteBuffer buf, long link, GridCacheContext cctx) throws IgniteCheckedException {
+            @Override public DataRow create(ByteBuffer buf, long link, GridCacheContext cctx)
+                throws IgniteCheckedException {
                 KeyCacheObject key = cctx.cacheObjects().toKeyCacheObject(cctx.cacheObjectContext(), buf);
                 CacheObject val = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), buf);
 
@@ -774,11 +950,9 @@ public class IgniteCacheOffheapManager extends GridCacheManagerAdapter {
 
         /** {@inheritDoc} */
         @Override public void store(ByteBuffer buf, int idx, KeySearchRow row) {
-            DataRow row0 = (DataRow)row;
-
-            assert row0.link != 0;
+            assert row.link != 0;
 
-            setLink(buf, idx, row0.link);
+            setLink(buf, idx, row.link);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 6b5eac3..416d9ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -583,6 +583,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * Clears swap entries for evicted partition.
      */
     private void clearSwap() {
+        if (true) // TODO GG-10884.
+            return;
+
         assert state() == EVICTED;
         assert !GridQueryProcessor.isEnabled(cctx.config()) : "Indexing needs to have unswapped values.";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 130d403..78cc86f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -282,15 +282,7 @@ class GridDhtPartitionSupplier {
                     }
                 }
 
-                GridCacheEntryInfoCollectSwapListener swapLsnr = null;
-
                 try {
-                    if (phase == SupplyContextPhase.NEW && cctx.isOffHeapEnabled()) {
-                        swapLsnr = new GridCacheEntryInfoCollectSwapListener(log);
-
-                        cctx.swap().addOffHeapListener(part, swapLsnr);
-                    }
-
                     boolean partMissing = false;
 
                     if (phase == SupplyContextPhase.NEW)
@@ -321,12 +313,11 @@ class GridDhtPartitionSupplier {
                                         partIt,
                                         part,
                                         entIt,
-                                        swapLsnr,
+                                        null,
                                         loc,
                                         d.topologyVersion(),
                                         d.updateSequence());
 
-                                    swapLsnr = null;
                                     loc = null;
 
                                     reply(node, d, s, scId);
@@ -368,14 +359,14 @@ class GridDhtPartitionSupplier {
                                 phase,
                                 partIt,
                                 null,
-                                swapLsnr,
+                                null,
                                 part,
                                 loc,
                                 d.updateSequence());
                         }
                     }
 
-                    if (phase == SupplyContextPhase.SWAP && cctx.isOffHeapEnabled()) {
+                    if (phase == SupplyContextPhase.SWAP && false) {
                         GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
                             sctx != null && sctx.entryIt != null ?
                                 (GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>>)sctx.entryIt :
@@ -407,12 +398,11 @@ class GridDhtPartitionSupplier {
                                             partIt,
                                             part,
                                             iter,
-                                            swapLsnr,
+                                            null,
                                             loc,
                                             d.topologyVersion(),
                                             d.updateSequence());
 
-                                        swapLsnr = null;
                                         loc = null;
 
                                         reply(node, d, s, scId);
@@ -476,15 +466,6 @@ class GridDhtPartitionSupplier {
                         }
                     }
 
-                    if (swapLsnr == null && sctx != null)
-                        swapLsnr = sctx.swapLsnr;
-
-                    // Stop receiving promote notifications.
-                    if (swapLsnr != null) {
-                        cctx.swap().removeOffHeapListener(part, swapLsnr);
-                        cctx.swap().removeSwapListener(part, swapLsnr);
-                    }
-
                     if (phase == SupplyContextPhase.SWAP) {
                         phase = SupplyContextPhase.EVICTED;
 
@@ -500,65 +481,6 @@ class GridDhtPartitionSupplier {
                         }
                     }
 
-                    if (phase == SupplyContextPhase.EVICTED && swapLsnr != null) {
-                        Collection<GridCacheEntryInfo> entries = swapLsnr.entries();
-
-                        swapLsnr = null;
-
-                        Iterator<GridCacheEntryInfo> lsnrIt = sctx != null && sctx.entryIt != null ?
-                            (Iterator<GridCacheEntryInfo>)sctx.entryIt : entries.iterator();
-
-                        while (lsnrIt.hasNext()) {
-                            if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
-                                // Demander no longer needs this partition,
-                                // so we send '-1' partition and move on.
-                                s.missed(part);
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Demanding node does not need requested partition " +
-                                        "[part=" + part + ", nodeId=" + id + ']');
-
-                                // No need to continue iteration over swap entries.
-                                break;
-                            }
-
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
-                                if (++bCnt >= maxBatchesCnt) {
-                                    saveSupplyContext(scId,
-                                        phase,
-                                        partIt,
-                                        part,
-                                        lsnrIt,
-                                        swapLsnr,
-                                        loc,
-                                        d.topologyVersion(),
-                                        d.updateSequence());
-
-                                    loc = null;
-
-                                    reply(node, d, s, scId);
-
-                                    return;
-                                }
-                                else {
-                                    if (!reply(node, d, s, scId))
-                                        return;
-
-                                    s = new GridDhtPartitionSupplyMessageV2(d.updateSequence(),
-                                        cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
-                                }
-                            }
-
-                            GridCacheEntryInfo info = lsnrIt.next();
-
-                            if (preloadPred == null || preloadPred.apply(info))
-                                s.addEntry(part, info, cctx);
-                            else if (log.isDebugEnabled())
-                                log.debug("Rebalance predicate evaluated to false (will not sender cache entry): " +
-                                    info);
-                        }
-                    }
-
                     // Mark as last supply message.
                     s.last(part);
 
@@ -569,11 +491,6 @@ class GridDhtPartitionSupplier {
                 finally {
                     if (loc != null)
                         loc.release();
-
-                    if (swapLsnr != null) {
-                        cctx.swap().removeOffHeapListener(part, swapLsnr);
-                        cctx.swap().removeSwapListener(part, swapLsnr);
-                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 17af4ba..75dc93b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -421,7 +421,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      */
     public IgniteBiTuple<CacheObject, GridCacheVersion> read(KeyCacheObject key, int partId) throws IgniteCheckedException {
         if (!enterBusy())
-            return null; // Ignore index update when node is stopping.
+            return null;
 
         try {
             return qryProc.read(space, key, partId);
@@ -431,8 +431,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         }
     }
 
-    public List<BPlusTree<?, ? extends CacheDataRow>> pkIndexes() {
-        return qryProc.pkIndexes(space);
+    public BPlusTree<?, ? extends CacheDataRow> pkIndex() {
+        return qryProc.pkIndex(space);
     }
 
     /**
@@ -839,6 +839,87 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         try {
             injectResources(keyValFilter);
 
+            Integer part = qry.partition();
+
+            if (cctx.isLocal())
+                part = null;
+
+            if (part != null && (part < 0 || part >= cctx.affinity().partitions()))
+                return new GridEmptyCloseableIterator<>();
+
+            final ExpiryPolicy plc = cctx.expiry();
+
+            final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
+            final boolean backups = qry.includeBackups() || cctx.isReplicated();
+
+            final GridDhtLocalPartition locPart;
+
+            final GridIterator<CacheDataRow> it;
+
+            if (part != null) {
+                final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
+
+                GridDhtLocalPartition locPart0 = dht.topology().localPartition(part, topVer, false);
+
+                if (locPart0 == null || locPart0.state() != OWNING || !locPart0.reserve())
+                    throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
+                        "Partition can not be reserved");
+
+                if (locPart0.state() != OWNING) {
+                    locPart0.release();
+
+                    throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(),
+                        "Partition can not be reserved");
+                }
+
+                locPart = locPart0;
+
+                it = cctx.offheap().iterator(part);
+            }
+            else {
+                locPart = null;
+
+                it = cctx.offheap().iterator(backups, topVer);
+            }
+
+            return new PeekValueExpiryAwareIterator0(it, plc, topVer, keyValFilter, qry.keepBinary()) {
+                @Override protected void onClose() {
+                    super.onClose();
+
+                    if (locPart != null)
+                        locPart.release();
+
+                    closeScanFilter(keyValFilter);
+                }
+            };
+        }
+        catch (IgniteCheckedException | RuntimeException e) {
+            closeScanFilter(keyValFilter);
+
+            throw e;
+        }
+    }
+
+    /**
+     * @param qry Query.
+     * @return Full-scan row iterator.
+     * @throws IgniteCheckedException If failed to get iterator.
+     */
+    @SuppressWarnings({"unchecked"})
+    private GridCloseableIterator<IgniteBiTuple<K, V>> scanIterator0(final GridCacheQueryAdapter<?> qry)
+        throws IgniteCheckedException {
+        IgniteInternalCache<K, V> prj0 = cctx.cache();
+
+        prj0 = prj0.keepBinary();
+
+        final IgniteInternalCache prj = prj0;
+
+        final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
+
+        try {
+            injectResources(keyValFilter);
+
             final ExpiryPolicy plc = cctx.expiry();
 
             final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
@@ -3281,4 +3362,178 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             return true;
         }
     }
+
+    /**
+     *
+     */
+    private class PeekValueExpiryAwareIterator0 extends GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final ExpiryPolicy plc;
+
+        /** */
+        private final GridCacheAdapter cache;
+
+        /** */
+        private final AffinityTopologyVersion topVer;
+
+        /** */
+        private final GridDhtCacheAdapter dht;
+
+        /** */
+        private final IgniteBiPredicate<K, V> keyValFilter;
+
+        /** */
+        private final boolean keepBinary;
+
+        /** */
+        private IgniteBiTuple<K, V> next;
+
+        /** */
+        private IgniteCacheExpiryPolicy expiryPlc;
+
+        /** */
+        private GridIterator<CacheDataRow> it;
+
+        /**
+         * @param it Iterator.
+         * @param plc Expiry policy.
+         * @param topVer Topology version.
+         * @param keyValFilter Key-value filter.
+         * @param keepBinary Keep binary flag from the query.
+         */
+        private PeekValueExpiryAwareIterator0(
+            GridIterator<CacheDataRow> it,
+            ExpiryPolicy plc,
+            AffinityTopologyVersion topVer,
+            IgniteBiPredicate<K, V> keyValFilter,
+            boolean keepBinary
+        ) {
+            this.it = it;
+            this.plc = plc;
+            this.topVer = topVer;
+            this.keyValFilter = keyValFilter;
+
+            dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
+            cache = dht != null ? dht : cctx.cache();
+
+            this.keepBinary = keepBinary;
+            expiryPlc = cctx.cache().expiryPolicy(plc);
+
+            advance();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onHasNext() {
+            return next != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteBiTuple<K, V> onNext() {
+            if (next == null)
+                throw new NoSuchElementException();
+
+            IgniteBiTuple<K, V> next0 = next;
+
+            advance();
+
+            return next0;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onClose() {
+            sendTtlUpdate();
+        }
+
+        /**
+         * Moves the iterator to the next cache entry.
+         */
+        private void advance() {
+            IgniteBiTuple<K, V> next0 = null;
+
+            while (it.hasNext()) {
+                CacheDataRow row = it.next();
+
+                KeyCacheObject key = row.key();
+
+                CacheObject val;
+
+                if (expiryPlc != null) {
+                    try {
+                        val = value(key);
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to peek value: " + e);
+
+                        val = null;
+                    }
+
+                    if (dht != null && expiryPlc.readyToFlush(100)) {
+                        dht.sendTtlUpdateRequest(expiryPlc);
+
+                        expiryPlc = cctx.cache().expiryPolicy(plc);
+                    }
+                }
+                else
+                    val = row.value();
+
+                if (val != null) {
+                    if (keyValFilter != null) {
+                        K k0 = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary);
+                        V v0 = (V)cctx.unwrapBinaryIfNeeded(val, keepBinary);
+
+                        if (!keyValFilter.apply(k0, v0))
+                            continue;
+                    }
+
+                    next0 = F.t((K)key, (V)val);
+
+                    break;
+                }
+            }
+
+            next = next0;
+
+            if (next == null)
+                sendTtlUpdate();
+        }
+
+        /**
+         * Sends TTL update.
+         */
+        private void sendTtlUpdate() {
+            if (dht != null && expiryPlc != null) {
+                dht.sendTtlUpdateRequest(expiryPlc);
+
+                expiryPlc = null;
+            }
+        }
+
+        /**
+         * @param key Key.
+         * @return Value.
+         * @throws IgniteCheckedException If failed to peek value.
+         */
+        private CacheObject value(KeyCacheObject key) throws IgniteCheckedException {
+            GridCacheEntryEx entry = null;
+
+            try {
+                entry = cache.entryEx(key);
+
+                entry.unswap();
+
+                return entry.peek(true, false, false, topVer, expiryPlc);
+            }
+            catch (GridCacheEntryRemovedException ignore) {
+                return null;
+            }
+            finally {
+                if (entry != null)
+                    cctx.evicts().touch(entry, topVer);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 9a03bdc..345f9b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -19,21 +19,25 @@ package org.apache.ignite.internal.processors.cache.query;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -155,7 +159,50 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
             metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
 
         if (data == null)
-            data = unmarshalCollection(dataBytes, ctx, ldr);
+            data = unmarshalCollection0(dataBytes, ctx, ldr);
+    }
+
+    /**
+     * @param byteCol Collection to unmarshal.
+     * @param ctx Context.
+     * @param ldr Loader.
+     * @return Unmarshalled collection.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable protected <T> List<T> unmarshalCollection0(@Nullable Collection<byte[]> byteCol,
+        GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+        assert ldr != null;
+        assert ctx != null;
+
+        if (byteCol == null)
+            return null;
+
+        List<T> col = new ArrayList<>(byteCol.size());
+
+        Marshaller marsh = ctx.marshaller();
+
+        ClassLoader ldr0 = U.resolveClassLoader(ldr, ctx.gridConfig());
+
+        CacheObjectContext cacheObjCtx = null;
+
+        for (byte[] bytes : byteCol) {
+            Object obj = bytes == null ? null : marsh.<T>unmarshal(bytes, ldr0);
+
+            if (obj instanceof Map.Entry) {
+                Object key = ((Map.Entry)obj).getKey();
+
+                if (key instanceof KeyCacheObject) {
+                    if (cacheObjCtx == null)
+                        cacheObjCtx = ctx.cacheContext(cacheId).cacheObjectContext();
+
+                    ((KeyCacheObject)key).finishUnmarshal(cacheObjCtx, ldr0);
+                }
+            }
+
+            col.add((T)obj);
+        }
+
+        return col;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 5972689..b294465 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -210,11 +210,17 @@ public interface GridQueryIndexing {
     /**
      * @param space Space name.
      * @param key Key.
+     * @param partId Partition.
      * @return Read versioned value.
      */
-    IgniteBiTuple<CacheObject,GridCacheVersion> read(String space, KeyCacheObject key, int partId) throws IgniteCheckedException;
+    IgniteBiTuple<CacheObject, GridCacheVersion> read(String space, KeyCacheObject key, int partId)
+        throws IgniteCheckedException;
 
-    public List<BPlusTree<?, ? extends CacheDataRow>> pkIndexes(String space);
+    /**
+     * @param space Space name.
+     * @return Primary key index.
+     */
+    public BPlusTree<?, ? extends CacheDataRow> pkIndex(String space);
 
     /**
      * Will be called when entry with given key is swapped.

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index abfa15a..d2be065 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -730,7 +730,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         }
     }
 
-    public List<BPlusTree<?, ? extends CacheDataRow>> pkIndexes(String space) {
+    public BPlusTree<?, ? extends CacheDataRow> pkIndex(String space) {
         if (idx == null)
             return null;
 
@@ -738,7 +738,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to write to index (grid is stopping).");
 
         try {
-            return idx.pkIndexes(space);
+            return idx.pkIndex(space);
         }
         finally {
             busyLock.leaveBusy();

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/database/IgniteDbPutGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/database/IgniteDbPutGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/database/IgniteDbPutGetTest.java
deleted file mode 100644
index e938291..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/database/IgniteDbPutGetTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.database;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DatabaseConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-
-/**
- *
- */
-public class IgniteDbPutGetTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
-
-        DatabaseConfiguration dbCfg = new DatabaseConfiguration();
-
-        dbCfg.setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4);
-
-        dbCfg.setPageSize(256);
-
-        dbCfg.setPageCacheSize(100 * 1024 * 1024);
-
-        cfg.setDatabaseConfiguration(dbCfg);
-
-        CacheConfiguration ccfg = new CacheConfiguration();
-
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGrids(1);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-
-        super.afterTestsStopped();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSizeClear() throws Exception {
-        Ignite ignite = ignite(0);
-
-        IgniteCache<Integer, Integer> cache = ignite.cache(null);
-
-        cache.put(1, 1);
-        cache.put(2, 2);
-
-        assertEquals(2, cache.size());
-
-        cache.clear();
-
-        assertNull(cache.get(1));
-        assertNull(cache.get(2));
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPutGet() throws Exception {
-        Ignite ignite = ignite(0);
-
-        IgniteCache<Integer, Integer> cache = ignite.cache(null);
-
-        cache.put(1, 1);
-        cache.put(2, 2);
-
-        assertEquals(1, (Object)cache.get(1));
-
-        cache.remove(1);
-        assertNull(cache.get(1));
-        assertEquals(2, (Object)cache.get(2));
-
-        cache.remove(2);
-
-        assertNull(cache.get(2));
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/43de3d67/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMultiNodePutGetTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMultiNodePutGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMultiNodePutGetTest.java
new file mode 100644
index 0000000..0f880e9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbMultiNodePutGetTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.database;
+
+/**
+ *
+ */
+public class IgniteDbMultiNodePutGetTest extends IgniteDbPutGetAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean indexingEnabled() {
+        return false;
+    }
+}


Mime
View raw message