ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [10/43] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches
Date Thu, 08 Jun 2017 12:33:03 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 955ca69..a25d794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.cache.database;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
 import org.apache.ignite.internal.processors.cache.IncompleteObject;
@@ -94,25 +93,26 @@ public class CacheDataRowAdapter implements CacheDataRow {
     /**
      * Read row from data pages.
      *
-     * @param cctx Cache context.
+     * @param grp Cache group.
      * @param rowData Required row data.
      * @throws IgniteCheckedException If failed.
      */
-    public final void initFromLink(GridCacheContext<?, ?> cctx, RowData rowData) throws IgniteCheckedException {
-        initFromLink(cctx, cctx.shared(), cctx.memoryPolicy().pageMemory(), rowData);
+    public final void initFromLink(CacheGroupContext grp, RowData rowData) throws IgniteCheckedException {
+        initFromLink(grp, grp.shared(), grp.memoryPolicy().pageMemory(), rowData);
     }
 
     /**
      * Read row from data pages.
      * Can be called with cctx == null, if cache instance is unknown, but its ID is stored in the data row.
      *
-     * @param cctx Cctx.
+     * @param grp Cache group.
      * @param sharedCtx Shared context.
      * @param pageMem Page memory.
      * @param rowData Row data.
+     * @throws IgniteCheckedException If failed.
      */
     public final void initFromLink(
-        @Nullable GridCacheContext<?, ?> cctx,
+        @Nullable CacheGroupContext grp,
         GridCacheSharedContext<?, ?> sharedCtx,
         PageMemory pageMem,
         RowData rowData)
@@ -120,14 +120,9 @@ public class CacheDataRowAdapter implements CacheDataRow {
         assert link != 0 : "link";
         assert key == null : "key";
 
-        CacheObjectContext coctx = null;
-
-        if (cctx != null) {
-            cacheId = cctx.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ?
-                cctx.cacheId() : 0; // Force cacheId reading for evictable memory policies.
+        CacheObjectContext coctx = grp != null ?  grp.cacheObjectContext() : null;
 
-            coctx = cctx.cacheObjectContext();
-        }
+        boolean readCacheId = grp == null || grp.storeCacheIdInDataPage();
 
         long nextLink = link;
         IncompleteObject<?> incomplete = null;
@@ -135,10 +130,16 @@ public class CacheDataRowAdapter implements CacheDataRow {
 
         do {
             final long pageId = pageId(nextLink);
-            final long page = pageMem.acquirePage(cacheId, pageId);
+
+            // Group is null if try evict page, with persistence evictions should be disabled.
+            assert grp != null || !sharedCtx.database().persistenceEnabled();
+
+            int grpId = grp != null ? grp.groupId() : 0;
+
+            final long page = pageMem.acquirePage(grpId, pageId);
 
             try {
-                long pageAddr = pageMem.readLock(cacheId, pageId, page); // Non-empty data page must not be recycled.
+                long pageAddr = pageMem.readLock(grpId, pageId, page); // Non-empty data page must not be recycled.
 
                 assert pageAddr != 0L : nextLink;
 
@@ -154,7 +155,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
                     if (first) {
                         if (nextLink == 0) {
                             // Fast path for a single page row.
-                            readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData);
+                            readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData, readCacheId);
 
                             return;
                         }
@@ -169,17 +170,17 @@ public class CacheDataRowAdapter implements CacheDataRow {
 
                     boolean keyOnly = rowData == RowData.KEY_ONLY;
 
-                    incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, incomplete);
+                    incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, readCacheId, incomplete);
 
                     if (keyOnly && key != null)
                         return;
                 }
                 finally {
-                    pageMem.readUnlock(cacheId, pageId, page);
+                    pageMem.readUnlock(grpId, pageId, page);
                 }
             }
             finally {
-                pageMem.releasePage(cacheId, pageId, page);
+                pageMem.releasePage(grpId, pageId, page);
             }
         }
         while(nextLink != 0);
@@ -188,9 +189,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
     }
 
     /**
+     * @param sharedCtx Cache shared context.
      * @param coctx Cache object context.
      * @param buf Buffer.
      * @param keyOnly {@code true} If need to read only key object.
+     * @param readCacheId {@code true} If need to read cache ID.
      * @param incomplete Incomplete object.
      * @throws IgniteCheckedException If failed.
      * @return Read object.
@@ -200,9 +203,10 @@ public class CacheDataRowAdapter implements CacheDataRow {
         CacheObjectContext coctx,
         ByteBuffer buf,
         boolean keyOnly,
+        boolean readCacheId,
         IncompleteObject<?> incomplete
     ) throws IgniteCheckedException {
-        if (cacheId == 0) {
+        if (readCacheId && cacheId == 0) {
             incomplete = readIncompleteCacheId(buf, incomplete);
 
             if (cacheId == 0)
@@ -211,8 +215,13 @@ public class CacheDataRowAdapter implements CacheDataRow {
             incomplete = null;
         }
 
-        if (coctx == null)
+        if (coctx == null) {
+            // coctx can be null only when grp is null too, this means that
+            // we are in process of eviction and cacheId is mandatory part of data.
+            assert cacheId != 0;
+
             coctx = sharedCtx.cacheContext(cacheId).cacheObjectContext();
+        }
 
         // Read key.
         if (key == null) {
@@ -251,20 +260,23 @@ public class CacheDataRowAdapter implements CacheDataRow {
     }
 
     /**
+     * @param sharedCtx Cache shared context.
      * @param coctx Cache object context.
      * @param addr Address.
      * @param rowData Required row data.
+     * @param readCacheId {@code true} If need to read cache ID.
      * @throws IgniteCheckedException If failed.
      */
     private void readFullRow(
         GridCacheSharedContext<?, ?> sharedCtx,
         CacheObjectContext coctx,
         long addr,
-        RowData rowData)
+        RowData rowData,
+        boolean readCacheId)
         throws IgniteCheckedException {
         int off = 0;
 
-        if (cacheId == 0) {
+        if (readCacheId) {
             cacheId = PageUtils.getInt(addr, off);
 
             off += 4;
@@ -328,6 +340,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
             if (remaining >= size) {
                 cacheId = buf.getInt();
 
+                assert cacheId != 0;
+
                 return null;
             }
 
@@ -342,6 +356,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
             timeBuf.order(buf.order());
 
             cacheId = timeBuf.getInt();
+
+            assert cacheId != 0;
         }
 
         return incomplete;
@@ -401,7 +417,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
      * @param buf Buffer.
      * @param incomplete Incomplete object.
      * @return Incomplete object.
-     * @throws IgniteCheckedException If failed.
      */
     private IncompleteObject<?> readIncompleteExpireTime(
         ByteBuffer buf,

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
index d51cf0e..6e429c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
@@ -37,4 +37,9 @@ public interface CacheSearchRow {
      * @return Key hash code.
      */
     public int hash();
+
+    /**
+     * @return Cache ID or {@code 0} if cache ID is not defined.
+     */
+    public int cacheId();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
index 1c4c89e..0e0faef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
@@ -19,8 +19,9 @@ package org.apache.ignite.internal.processors.cache.database;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
 
 /**
@@ -31,27 +32,27 @@ public class RowStore {
     private final FreeList freeList;
 
     /** */
-    protected final PageMemory pageMem;
+    private final GridCacheSharedContext ctx;
 
     /** */
-    protected final GridCacheContext<?,?> cctx;
+    protected final PageMemory pageMem;
 
     /** */
     protected final CacheObjectContext coctx;
 
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      * @param freeList Free list.
      */
-    public RowStore(GridCacheContext<?,?> cctx, FreeList freeList) {
-        assert cctx != null;
+    public RowStore(CacheGroupContext grp, FreeList freeList) {
+        assert grp != null;
         assert freeList != null;
 
-        this.cctx = cctx;
         this.freeList = freeList;
 
-        coctx = cctx.cacheObjectContext();
-        pageMem = cctx.memoryPolicy().pageMemory();
+        ctx = grp.shared();
+        coctx = grp.cacheObjectContext();
+        pageMem = grp.memoryPolicy().pageMemory();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
index 4d2110c..d92f811 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
@@ -884,11 +884,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
     /**
      * @param upper Upper bound.
+     * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
-    private GridCursor<T> findLowerUnbounded(L upper) throws IgniteCheckedException {
-        ForwardCursor cursor = new ForwardCursor(null, upper);
+    private GridCursor<T> findLowerUnbounded(L upper, Object x) throws IgniteCheckedException {
+        ForwardCursor cursor = new ForwardCursor(null, upper, x);
 
         long firstPageId;
 
@@ -933,14 +934,25 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
-    @Override public final GridCursor<T> find(L lower, L upper) throws IgniteCheckedException {
+    @Override public GridCursor<T> find(L lower, L upper) throws IgniteCheckedException {
+        return find(lower, upper, null);
+    }
+
+    /**
+     * @param lower Lower bound inclusive or {@code null} if unbounded.
+     * @param upper Upper bound inclusive or {@code null} if unbounded.
+     * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+     * @return Cursor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final GridCursor<T> find(L lower, L upper, Object x) throws IgniteCheckedException {
         checkDestroyed();
 
         try {
             if (lower == null)
-                return findLowerUnbounded(upper);
+                return findLowerUnbounded(upper, x);
 
-            ForwardCursor cursor = new ForwardCursor(lower, upper);
+            ForwardCursor cursor = new ForwardCursor(lower, upper, x);
 
             cursor.find();
 
@@ -2072,6 +2084,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         try {
             long metaPageAddr = writeLock(metaPageId, metaPage); // No checks, we must be out of use.
 
+            assert metaPageAddr != 0L;
+
             try {
                 for (long pageId : getFirstPageIds(metaPageAddr)) {
                     assert pageId != 0;
@@ -4390,6 +4404,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         /** */
         private final L upperBound;
 
+        /** */
+        private final Object x;
+
         /**
          * @param lowerBound Lower bound.
          * @param upperBound Upper bound.
@@ -4397,6 +4414,18 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         ForwardCursor(L lowerBound, L upperBound) {
             this.lowerBound = lowerBound;
             this.upperBound = upperBound;
+            this.x = null;
+        }
+
+        /**
+         * @param lowerBound Lower bound.
+         * @param upperBound Upper bound.
+         * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+         */
+        ForwardCursor(L lowerBound, L upperBound, Object x) {
+            this.lowerBound = lowerBound;
+            this.upperBound = upperBound;
+            this.x = x;
         }
 
         /**
@@ -4513,7 +4542,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                 rows = (T[])new Object[cnt];
 
             for (int i = 0; i < cnt; i++) {
-                T r = getRow(io, pageAddr, startIdx + i);
+                T r = getRow(io, pageAddr, startIdx + i, x);
 
                 rows = GridArrays.set(rows, i, r);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
index 2586696..927446b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
@@ -157,6 +157,18 @@ public abstract class PageIO {
     /** */
     public static final short T_PAGE_UPDATE_TRACKING = 15;
 
+    /** */
+    public static final short T_CACHE_ID_AWARE_DATA_REF_INNER = 16;
+
+    /** */
+    public static final short T_CACHE_ID_AWARE_DATA_REF_LEAF = 17;
+
+    /** */
+    public static final short T_CACHE_ID_AWARE_PENDING_REF_INNER = 18;
+
+    /** */
+    public static final short T_CACHE_ID_AWARE_PENDING_REF_LEAF = 19;
+
     /** Index for payload == 1. */
     public static final short T_H2_EX_REF_LEAF_START = 10000;
 
@@ -484,6 +496,12 @@ public abstract class PageIO {
             case T_DATA_REF_LEAF:
                 return (Q)IgniteCacheOffheapManagerImpl.DataLeafIO.VERSIONS.forVersion(ver);
 
+            case T_CACHE_ID_AWARE_DATA_REF_INNER:
+                return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwareDataInnerIO.VERSIONS.forVersion(ver);
+
+            case T_CACHE_ID_AWARE_DATA_REF_LEAF:
+                return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwareDataLeafIO.VERSIONS.forVersion(ver);
+
             case T_METASTORE_INNER:
                 return (Q)MetadataStorage.MetaStoreInnerIO.VERSIONS.forVersion(ver);
 
@@ -496,6 +514,12 @@ public abstract class PageIO {
             case T_PENDING_REF_LEAF:
                 return (Q)IgniteCacheOffheapManagerImpl.PendingEntryLeafIO.VERSIONS.forVersion(ver);
 
+            case T_CACHE_ID_AWARE_PENDING_REF_INNER:
+                return (Q) IgniteCacheOffheapManagerImpl.CacheIdAwarePendingEntryInnerIO.VERSIONS.forVersion(ver);
+
+            case T_CACHE_ID_AWARE_PENDING_REF_LEAF:
+                return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwarePendingEntryLeafIO.VERSIONS.forVersion(ver);
+
             default:
                 // For tests.
                 if (innerTestIO != null && innerTestIO.getType() == type && innerTestIO.getVersion() == ver)

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index 5d1885e..c092132 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -38,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  *
  */
-public class GridCacheTtlUpdateRequest extends GridCacheMessage {
+public class GridCacheTtlUpdateRequest extends GridCacheIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index 630c79f..fc209aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@ -24,7 +24,7 @@ import java.util.Collections;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -37,7 +37,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Base for all messages in replicated cache.
  */
-public abstract class GridDistributedBaseMessage extends GridCacheMessage implements GridCacheDeployable,
+public abstract class GridDistributedBaseMessage extends GridCacheIdMessage implements GridCacheDeployable,
     GridCacheVersionable {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 096ca9f..dc9e4ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -79,10 +79,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
     /**
      * @param ctx Cache registry.
-     * @param startSize Start size.
      */
-    protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, int startSize) {
-        super(ctx, startSize);
+    protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx) {
+        super(ctx);
     }
 
     /**
@@ -279,11 +278,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
             IgniteCacheOffheapManager offheap = ctx.offheap();
 
             if (modes.offheap)
-                size += offheap.entriesCount(modes.primary, modes.backup, topVer);
+                size += offheap.cacheEntriesCount(ctx.cacheId(), modes.primary, modes.backup, topVer);
             else if (modes.heap) {
                 for (GridDhtLocalPartition locPart : ctx.topology().currentLocalPartitions()) {
                     if ((modes.primary && locPart.primary(topVer)) || (modes.backup && locPart.backup(topVer)))
-                        size += locPart.publicSize();
+                        size += locPart.publicSize(ctx.cacheId());
                 }
             }
         }
@@ -292,7 +291,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public long localSizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
+    @Override public long localSizeLong(int part, CachePeekMode[] peekModes) throws IgniteCheckedException {
         PeekModes modes = parsePeekModes(peekModes, true);
 
         long size = 0;
@@ -306,9 +305,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
             IgniteCacheOffheapManager offheap = ctx.offheap();
 
-            if (ctx.affinity().primaryByPartition(ctx.localNode(), partition, topVer) && modes.primary ||
-                ctx.affinity().backupByPartition(ctx.localNode(), partition, topVer) && modes.backup)
-                size += offheap.entriesCount(partition);
+            if (ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer) && modes.primary ||
+                ctx.affinity().backupByPartition(ctx.localNode(), part, topVer) && modes.backup)
+                size += offheap.cacheEntriesCount(ctx.cacheId(), part);
         }
 
         return size;
@@ -460,7 +459,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                             return false;
 
                         try {
-                            GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().keysIterator(part);
+                            GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().cacheKeysIterator(ctx.cacheId(), part);
 
                             if (iter != null) {
                                 try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 561c292..c36e633 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -71,6 +71,16 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public final int partition() {
         return part;
     }
@@ -135,25 +145,25 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
         }
 
         switch (writer.state()) {
-            case 3:
+            case 2:
                 if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
-            case 4:
+            case 3:
                 if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 5:
+            case 4:
                 if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 5:
                 if (!writer.writeMessage("txId", txId))
                     return false;
 
@@ -175,7 +185,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
             return false;
 
         switch (reader.state()) {
-            case 3:
+            case 2:
                 flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
@@ -183,7 +193,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 4:
+            case 3:
                 futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
@@ -191,7 +201,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 5:
+            case 4:
                 part = reader.readInt("part");
 
                 if (!reader.isLastRead())
@@ -199,7 +209,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 6:
+            case 5:
                 txId = reader.readMessage("txId");
 
                 if (!reader.isLastRead())
@@ -219,7 +229,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 6;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 9cb04d4..299fcf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -594,6 +594,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                         dhtVer,
                                                         txEntry.updateCounter());
                                                 else {
+                                                    assert val != null : txEntry;
+
                                                     cached.innerSet(this,
                                                         eventNodeId(),
                                                         nodeId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
index 76c7a15..3b41ffa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
@@ -25,7 +25,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -38,24 +38,25 @@ import org.jetbrains.annotations.Nullable;
  * An implementation of GridCacheConcurrentMap that will delegate all method calls to corresponding local partition.
  */
 public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap {
-    /** Context. */
-    private final GridCacheContext ctx;
+    /** Cache group. */
+    private final CacheGroupContext grp;
 
     /**
-     * Constructor.
-     * @param ctx Context.
+     * @param grp Cache group.
      */
-    public GridCachePartitionedConcurrentMap(GridCacheContext ctx) {
-        this.ctx = ctx;
+    GridCachePartitionedConcurrentMap(CacheGroupContext grp) {
+        this.grp = grp;
     }
 
     /**
+     * @param cctx Cache context.
      * @param key Key.
      * @param topVer Topology version.
      * @param create Create flag.
      * @return Local partition.
      */
     @Nullable private GridDhtLocalPartition localPartition(
+        GridCacheContext cctx,
         KeyCacheObject key,
         AffinityTopologyVersion topVer,
         boolean create
@@ -63,33 +64,33 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
         int p = key.partition();
 
         if (p == -1)
-            p = ctx.affinity().partition(key);
+            p = cctx.affinity().partition(key);
 
-        return ctx.topology().localPartition(p, topVer, create);
+        return grp.topology().localPartition(p, topVer, create);
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) {
-        GridDhtLocalPartition part = localPartition(key, AffinityTopologyVersion.NONE, false);
+    @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) {
+        GridDhtLocalPartition part = localPartition(ctx, key, AffinityTopologyVersion.NONE, false);
 
         if (part == null)
             return null;
 
-        return part.getEntry(key);
+        return part.getEntry(ctx, key);
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer,
+    @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(GridCacheContext ctx, AffinityTopologyVersion topVer,
         KeyCacheObject key,
         boolean create,
         boolean touch) {
         while (true) {
-            GridDhtLocalPartition part = localPartition(key, topVer, create);
+            GridDhtLocalPartition part = localPartition(ctx, key, topVer, create);
 
             if (part == null)
                 return null;
 
-            GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, create, touch);
+            GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(ctx, topVer, key, create, touch);
 
             if (res != null || !create)
                 return res;
@@ -102,35 +103,35 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
     @Override public int internalSize() {
         int size = 0;
 
-        for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
+        for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
             size += part.internalSize();
 
         return size;
     }
 
     /** {@inheritDoc} */
-    @Override public int publicSize() {
+    @Override public int publicSize(int cacheId) {
         int size = 0;
 
-        for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
-            size += part.publicSize();
+        for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
+            size += part.publicSize(cacheId);
 
         return size;
     }
 
     /** {@inheritDoc} */
-    @Override public void incrementPublicSize(GridCacheEntryEx e) {
-        localPartition(e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e);
+    @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
+        localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(hld, e);
     }
 
     /** {@inheritDoc} */
-    @Override public void decrementPublicSize(GridCacheEntryEx e) {
-        localPartition(e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e);
+    @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
+        localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(hld, e);
     }
 
     /** {@inheritDoc} */
     @Override public boolean removeEntry(GridCacheEntryEx entry) {
-        GridDhtLocalPartition part = localPartition(entry.key(), AffinityTopologyVersion.NONE, false);
+        GridDhtLocalPartition part = localPartition(entry.context(), entry.key(), AffinityTopologyVersion.NONE, false);
 
         if (part == null)
             return false;
@@ -139,12 +140,12 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
     }
 
     /** {@inheritDoc} */
-    @Override public Iterable<GridCacheMapEntry> entries(final CacheEntryPredicate... filter) {
+    @Override public Iterable<GridCacheMapEntry> entries(final int cacheId, final CacheEntryPredicate... filter) {
         return new Iterable<GridCacheMapEntry>() {
             @Override public Iterator<GridCacheMapEntry> iterator() {
                 return new PartitionedIterator<GridCacheMapEntry>() {
                     @Override protected Iterator<GridCacheMapEntry> iterator(GridDhtLocalPartition part) {
-                        return part.entries(filter).iterator();
+                        return part.entries(cacheId, filter).iterator();
                     }
                 };
             }
@@ -152,23 +153,10 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
     }
 
     /** {@inheritDoc} */
-    @Override public Iterable<GridCacheMapEntry> allEntries(final CacheEntryPredicate... filter) {
-        return new Iterable<GridCacheMapEntry>() {
-            @Override public Iterator<GridCacheMapEntry> iterator() {
-                return new PartitionedIterator<GridCacheMapEntry>() {
-                    @Override protected Iterator<GridCacheMapEntry> iterator(GridDhtLocalPartition part) {
-                        return part.allEntries(filter).iterator();
-                    }
-                };
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override public Set<GridCacheMapEntry> entrySet(final CacheEntryPredicate... filter) {
+    @Override public Set<GridCacheMapEntry> entrySet(final int cacheId, final CacheEntryPredicate... filter) {
         return new PartitionedSet<GridCacheMapEntry>() {
             @Override protected Set<GridCacheMapEntry> set(GridDhtLocalPartition part) {
-                return part.entrySet(filter);
+                return part.entrySet(cacheId, filter);
             }
         };
     }
@@ -178,7 +166,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
      */
     private abstract class PartitionedIterator<T> implements Iterator<T> {
         /** Partitions iterator. */
-        private Iterator<GridDhtLocalPartition> partsIter = ctx.topology().currentLocalPartitions().iterator();
+        private Iterator<GridDhtLocalPartition> partsIter = grp.topology().currentLocalPartitions().iterator();
 
         /** Current partition iterator. */
         private Iterator<T> currIter = partsIter.hasNext() ? iterator(partsIter.next()) :
@@ -242,7 +230,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
         @Override public int size() {
             int size = 0;
 
-            for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
+            for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
                 size += set(part).size();
 
             return size;
@@ -250,7 +238,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
 
         /** {@inheritDoc} */
         @Override public boolean contains(Object o) {
-            for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions()) {
+            for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
                 if (set(part).contains(o))
                     return true;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 43bc609..619630f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -71,7 +71,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     private GridCacheSharedContext cctx;
 
     /** Cache ID. */
-    private int cacheId;
+    private int grpId;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -111,18 +111,18 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /**
      * @param cctx Context.
-     * @param cacheId Cache ID.
+     * @param grpId Group ID.
      * @param exchFut Exchange ID.
      * @param similarAffKey Key to find caches with similar affinity.
      */
     public GridClientPartitionTopology(
         GridCacheSharedContext cctx,
-        int cacheId,
+        int grpId,
         GridDhtPartitionsExchangeFuture exchFut,
         Object similarAffKey
     ) {
         this.cctx = cctx;
-        this.cacheId = cacheId;
+        this.grpId = grpId;
         this.similarAffKey = similarAffKey;
 
         topVer = exchFut.topologyVersion();
@@ -166,8 +166,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public int cacheId() {
-        return cacheId;
+    @Override public int groupId() {
+        return grpId;
     }
 
     /** {@inheritDoc} */
@@ -281,7 +281,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         long updateSeq = this.updateSeq.incrementAndGet();
 
         // If this is the oldest node.
-        if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheStarted(cacheId)) {
+        if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheGroupStarted(grpId)) {
             if (node2part == null) {
                 node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -353,8 +353,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
-        return localPartition(1, AffinityTopologyVersion.NONE, create);
+    @Override public GridDhtLocalPartition localPartition(int p) {
+        return localPartition(p, AffinityTopologyVersion.NONE, false);
     }
 
     /** {@inheritDoc} */
@@ -542,7 +542,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         lock.readLock().lock();
 
         try {
-            assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
+            if (stopping || node2part == null)
+                return null;
+
+            assert node2part.valid() : "Invalid node2part [node2part: " + node2part +
                 ", locNodeId=" + cctx.localNodeId() +
                 ", igniteInstanceName=" + cctx.igniteInstanceName() + ']';
 
@@ -1006,7 +1009,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() +
-            ", cacheId=" + cacheId + ']');
+            ", grpId=" + grpId + ']');
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index f80adc5..d9d642a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -27,7 +27,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Affinity assignment request.
  */
-public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
+public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -46,17 +46,17 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
 
     /**
      * @param futId Future ID.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param topVer Topology version.
      */
     public GridDhtAffinityAssignmentRequest(
         long futId,
-        int cacheId,
+        int grpId,
         AffinityTopologyVersion topVer) {
         assert topVer != null;
 
         this.futId = futId;
-        this.cacheId = cacheId;
+        this.grpId = grpId;
         this.topVer = topVer;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 5d82171..4df3fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -27,20 +27,18 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * Affinity assignment response.
  */
-public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
+public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -73,17 +71,17 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
 
     /**
      * @param futId Future ID.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment.
      */
     public GridDhtAffinityAssignmentResponse(
         long futId,
-        int cacheId,
+        int grpId,
         @NotNull AffinityTopologyVersion topVer,
         List<List<ClusterNode>> affAssignment) {
         this.futId = futId;
-        this.cacheId = cacheId;
+        this.grpId = grpId;
         this.topVer = topVer;
 
         affAssignmentIds = ids(affAssignment);

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 741ca5e..8746320 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -74,27 +74,27 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     private final AffinityTopologyVersion topVer;
 
     /** */
-    private final int cacheId;
+    private final int grpId;
 
     /**
      * @param ctx Context.
-     * @param cacheDesc Cache descriptor.
+     * @param grpDesc Group descriptor.
      * @param topVer Topology version.
      * @param discoCache Discovery cache.
      */
     public GridDhtAssignmentFetchFuture(
         GridCacheSharedContext ctx,
-        DynamicCacheDescriptor cacheDesc,
+        CacheGroupDescriptor grpDesc,
         AffinityTopologyVersion topVer,
         DiscoCache discoCache
     ) {
         this.topVer = topVer;
-        this.cacheId = cacheDesc.cacheId();
+        this.grpId = grpDesc.groupId();
         this.ctx = ctx;
 
         id = idGen.getAndIncrement();
 
-        Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
+        Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpDesc.groupId());
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 
@@ -112,10 +112,10 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     }
 
     /**
-     * @return Cache ID.
+     * @return Cache group ID.
      */
-    public int cacheId() {
-        return cacheId;
+    public int groupId() {
+        return grpId;
     }
 
     /**
@@ -195,7 +195,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                             ", node=" + node + ']');
 
                     ctx.io().send(node,
-                        new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
+                        new GridDhtAffinityAssignmentRequest(id, grpId, topVer),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index c3d5f88..418d712 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.io.Externalizable;
-import java.util.AbstractSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,7 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import javax.cache.Cache;
@@ -34,13 +32,15 @@ import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
-import org.apache.ignite.internal.processors.cache.CachePeekModes;
 import org.apache.ignite.internal.processors.cache.EntryGetResult;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable;
@@ -59,7 +59,9 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdat
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -70,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CI3;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
@@ -86,8 +88,11 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
+import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
 
 /**
  * DHT cache adapter.
@@ -96,18 +101,36 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Topology. */
-    private GridDhtPartitionTopologyImpl top;
-
-    /** Preloader. */
-    protected GridCachePreloader preldr;
-
     /** Multi tx future holder. */
     private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>();
 
     /** Multi tx futures. */
     private ConcurrentMap<IgniteUuid, MultiUpdateFuture> multiTxFuts = new ConcurrentHashMap8<>();
 
+    /** Force key futures. */
+    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap();
+
+    /** */
+    private volatile boolean stopping;
+
+    /** Discovery listener. */
+    private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
+        @Override public void onEvent(Event evt) {
+            DiscoveryEvent e = (DiscoveryEvent)evt;
+
+            ClusterNode loc = ctx.localNode();
+
+            assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : e;
+
+            final ClusterNode n = e.eventNode();
+
+            assert !loc.id().equals(n.id());
+
+            for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values())
+                f.onDiscoveryEvent(e);
+        }
+    };
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -116,6 +139,176 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /**
+     * Adds future to future map.
+     *
+     * @param fut Future to add.
+     * @return {@code False} if node cache is stopping and future was completed with error.
+     */
+    public boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
+        forceKeyFuts.put(fut.futureId(), fut);
+
+        if (stopping) {
+            fut.onDone(stopError());
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Removes future from future map.
+     *
+     * @param fut Future to remove.
+     */
+    public void removeFuture(GridDhtForceKeysFuture<?, ?> fut) {
+        forceKeyFuts.remove(fut.futureId(), fut);
+    }
+
+    /**
+     * @param node Node.
+     * @param msg Message.
+     */
+    protected final void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg) {
+        GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId());
+
+        if (f != null)
+            f.onResult(msg);
+        else if (log.isDebugEnabled())
+            log.debug("Receive force key response for unknown future (is it duplicate?) [nodeId=" + node.id() +
+                ", res=" + msg + ']');
+    }
+    /**
+     * @param node Node originated request.
+     * @param msg Force keys message.
+     */
+    protected final void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) {
+        IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion());
+
+        if (fut.isDone())
+            processForceKeysRequest0(node, msg);
+        else
+            fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> t) {
+                    processForceKeysRequest0(node, msg);
+                }
+            });
+    }
+
+    /**
+     * @param node Node originated request.
+     * @param msg Force keys message.
+     */
+    private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg) {
+        try {
+            ClusterNode loc = ctx.localNode();
+
+            GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
+                ctx.cacheId(),
+                msg.futureId(),
+                msg.miniId(),
+                ctx.deploymentEnabled());
+
+            GridDhtPartitionTopology top = ctx.topology();
+
+            for (KeyCacheObject k : msg.keys()) {
+                int p = ctx.affinity().partition(k);
+
+                GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false);
+
+                // If this node is no longer an owner.
+                if (locPart == null && !top.owners(p).contains(loc)) {
+                    res.addMissed(k);
+
+                    continue;
+                }
+
+                GridCacheEntryEx entry;
+
+                while (true) {
+                    try {
+                        entry = ctx.dht().entryEx(k);
+
+                        entry.unswap();
+
+                        GridCacheEntryInfo info = entry.info();
+
+                        if (info == null) {
+                            assert entry.obsolete() : entry;
+
+                            continue;
+                        }
+
+                        if (!info.isNew())
+                            res.addInfo(info);
+
+                        ctx.evicts().touch(entry, msg.topologyVersion());
+
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry: " + k);
+                    }
+                    catch (GridDhtInvalidPartitionException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Local node is no longer an owner: " + p);
+
+                        res.addMissed(k);
+
+                        break;
+                    }
+                }
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']');
+
+            ctx.io().send(node, res, ctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Received force key request form failed node (will ignore) [nodeId=" + node.id() +
+                    ", req=" + msg + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to reply to force key request [nodeId=" + node.id() + ", req=" + msg + ']', e);
+        }
+    }
+
+    /**
+     *
+     */
+    public void dumpDebugInfo() {
+        if (!forceKeyFuts.isEmpty()) {
+            U.warn(log, "Pending force key futures [cache=" + ctx.name() + "]:");
+
+            for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
+                U.warn(log, ">>> " + fut);
+        }
+    }
+
+    @Override public void onKernalStop() {
+        super.onKernalStop();
+
+        stopping = true;
+
+        IgniteCheckedException err = stopError();
+
+        for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
+            fut.onDone(err);
+
+        ctx.gridEvents().removeLocalEventListener(discoLsnr);
+    }
+
+    /**
+     * @return Node stop exception.
+     */
+    private IgniteCheckedException stopError() {
+        return new NodeStoppingException("Operation has been cancelled (cache or node is stopping).");
+    }
+
+    /**
      * @param nodeId Sender node ID.
      * @param res Near get response.
      */
@@ -160,7 +353,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param ctx Context.
      */
     protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
-        this(ctx, new GridCachePartitionedConcurrentMap(ctx));
+        this(ctx, new GridCachePartitionedConcurrentMap(ctx.group()));
     }
 
     /**
@@ -174,83 +367,21 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /** {@inheritDoc} */
-    @Override protected void init() {
-        super.init();
-
-        top = new GridDhtPartitionTopologyImpl(ctx, entryFactory());
-    }
-
-    /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        super.start();
-
-        ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() {
             @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest req) {
                 processTtlUpdateRequest(req);
             }
         });
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop() {
-        super.stop();
-
-        if (preldr != null)
-            preldr.stop();
-
-        // Clean up to help GC.
-        preldr = null;
-        top = null;
-    }
 
-    /** {@inheritDoc} */
-    @Override public void onReconnected() {
-        super.onReconnected();
-
-        ctx.affinity().onReconnected();
-
-        top.onReconnected();
-
-        if (preldr != null)
-            preldr.onReconnected();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        if (preldr != null)
-            preldr.onKernalStart();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop() {
-        super.onKernalStop();
-
-        if (preldr != null)
-            preldr.onKernalStop();
+        ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
 
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         super.printMemoryStats();
 
-        top.printMemoryStats(1024);
-    }
-
-    /**
-     * @return Cache map entry factory.
-     */
-    @Override protected GridCacheMapEntryFactory entryFactory() {
-        return new GridCacheMapEntryFactory() {
-            @Override public GridCacheMapEntry create(
-                GridCacheContext ctx,
-                AffinityTopologyVersion topVer,
-                KeyCacheObject key
-            ) {
-                return new GridDhtCacheEntry(ctx, topVer, key);
-            }
-        };
+        ctx.group().topology().printMemoryStats(1024);
     }
 
     /**
@@ -262,21 +393,12 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @return Partition topology.
      */
     public GridDhtPartitionTopology topology() {
-        return top;
+        return ctx.group().topology();
     }
 
     /** {@inheritDoc} */
     @Override public GridCachePreloader preloader() {
-        return preldr;
-    }
-
-    /**
-     * @return DHT preloader.
-     */
-    public GridDhtPreloader dhtPreloader() {
-        assert preldr instanceof GridDhtPreloader;
-
-        return (GridDhtPreloader)preldr;
+        return ctx.group().preloader();
     }
 
     /**
@@ -300,6 +422,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         if (tup != null)
             throw new IgniteCheckedException("Nested multi-update locks are not supported");
 
+        GridDhtPartitionTopology top = ctx.group().topology();
+
         top.readLock();
 
         GridDhtTopologyFuture topFut;
@@ -342,7 +466,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         if (tup == null)
             throw new IgniteCheckedException("Multi-update was not started or released twice.");
 
-        top.readLock();
+        ctx.group().topology().readLock();
 
         try {
             IgniteUuid lockId = tup.get1();
@@ -355,7 +479,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             multiFut.onDone(lockId);
         }
         finally {
-            top.readUnlock();
+            ctx.group().topology().readUnlock();
         }
     }
 
@@ -516,7 +640,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             return;
 
         try {
-            GridDhtLocalPartition part = top.localPartition(ctx.affinity().partition(key),
+            GridDhtLocalPartition part = ctx.group().topology().localPartition(ctx.affinity().partition(key),
                 AffinityTopologyVersion.NONE, true);
 
             // Reserve to make sure that partition does not get unloaded.
@@ -578,7 +702,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         long sum = 0;
 
         for (GridDhtLocalPartition p : topology().currentLocalPartitions())
-            sum += p.dataStore().size();
+            sum += p.dataStore().cacheSize(ctx.cacheId());
 
         return sum;
     }
@@ -596,7 +720,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
         for (GridDhtLocalPartition p : topology().currentLocalPartitions()) {
             if (p.primary(topVer))
-                sum += p.dataStore().size();
+                sum += p.dataStore().cacheSize(ctx.cacheId());
         }
 
         return sum;
@@ -811,7 +935,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
                         res = new GridNearSingleGetResponse(ctx.cacheId(),
                             req.futureId(),
-                            req.topologyVersion(),
+                            null,
                             res0,
                             false,
                             req.addDeploymentInfo());
@@ -820,9 +944,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                             res.setContainsValue();
                     }
                     else {
-                        AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion();
+                        AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().topologyVersion();
 
-                        assert topVer.compareTo(req.topologyVersion()) >= 0 : "Wrong ready topology version for " +
+                        assert topVer.compareTo(req.topologyVersion()) > 0 : "Wrong ready topology version for " +
                             "invalid partitions response [topVer=" + topVer + ", req=" + req + ']';
 
                         res = new GridNearSingleGetResponse(ctx.cacheId(),
@@ -910,9 +1034,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 }
 
                 if (!F.isEmpty(fut.invalidPartitions()))
-                    res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().readyAffinityVersion());
-                else
-                    res.invalidPartitions(fut.invalidPartitions(), req.topologyVersion());
+                    res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().lastTopologyFuture().topologyVersion());
 
                 try {
                     ctx.io().send(nodeId, res, ctx.ioPolicy());
@@ -1098,7 +1220,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             false);
 
         if (part != null)
-            part.onDeferredDelete(entry.key(), ver);
+            part.onDeferredDelete(entry.context().cacheId(), entry.key(), ver);
     }
 
     /**
@@ -1110,8 +1232,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         if (expVer.equals(curVer))
             return false;
 
-        Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), expVer);
-        Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.cacheId(), curVer);
+        Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), expVer);
+        Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer);
 
         if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
             return true;
@@ -1149,7 +1271,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param topVer Specified affinity topology version.
      * @return Local entries iterator.
      */
-    public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary,
+    private Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary,
         final boolean backup,
         final boolean keepBinary,
         final AffinityTopologyVersion topVer) {
@@ -1163,7 +1285,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param topVer Specified affinity topology version.
      * @return Local entries iterator.
      */
-    public Iterator<? extends GridCacheEntryEx> localEntriesIteratorEx(final boolean primary,
+    private Iterator<? extends GridCacheEntryEx> localEntriesIteratorEx(final boolean primary,
         final boolean backup,
         final AffinityTopologyVersion topVer) {
         assert primary || backup;
@@ -1210,7 +1332,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                                 GridDhtLocalPartition part = partIt.next();
 
                                 if (primary == part.primary(topVer)) {
-                                    curIt = part.entries().iterator();
+                                    curIt = part.entries(ctx.cacheId()).iterator();
 
                                     break;
                                 }
@@ -1255,4 +1377,35 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             return topVer;
         }
     }
+
+    /**
+     *
+     */
+    protected abstract class MessageHandler<M> implements IgniteBiInClosure<UUID, M> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void apply(UUID nodeId, M msg) {
+            ClusterNode node = ctx.node(nodeId);
+
+            if (node == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Received message from failed node [node=" + nodeId + ", msg=" + msg + ']');
+
+                return;
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Received message from node [node=" + nodeId + ", msg=" + msg + ']');
+
+            onMessage(node, msg);
+        }
+
+        /**
+         * @param node Node.
+         * @param msg Message.
+         */
+        protected abstract void onMessage(ClusterNode node, M msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be7805f..2e86fb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -93,8 +93,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected long nextPartCounter() {
-        return locPart.nextUpdateCounter();
+    @Override protected long nextPartitionCounter(AffinityTopologyVersion topVer,
+        boolean primary,
+        @Nullable Long primaryCntr) {
+        return locPart.nextUpdateCounter(cctx.cacheId(), topVer, primary, primaryCntr);
     }
 
     /** {@inheritDoc} */
@@ -139,7 +141,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         assert !Thread.holdsLock(this);
 
         // Remove this entry from partition mapping.
-        cctx.dht().topology().onRemoved(this);
+        cctx.topology().onRemoved(this);
     }
 
     /**
@@ -715,8 +717,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
     /**
      * @return Cache name.
      */
-    protected String cacheName() {
-        return cctx.dht().near().name();
+    protected final String cacheName() {
+        return cctx.name();
     }
 
     /** {@inheritDoc} */
@@ -726,12 +728,12 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
 
     /** {@inheritDoc} */
     @Override protected void incrementMapPublicSize() {
-        locPart.incrementPublicSize(this);
+        locPart.incrementPublicSize(null, this);
     }
 
     /** {@inheritDoc} */
     @Override protected void decrementMapPublicSize() {
-        locPart.decrementPublicSize(this);
+        locPart.decrementPublicSize(null, this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 7bc17a1..8031c8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -166,7 +166,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
      * Initializes future.
      */
     void init() {
-        GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
+        GridDhtFuture<Object> fut = cctx.group().preloader().request(cctx, keys.keySet(), topVer);
 
         if (fut != null) {
             if (!F.isEmpty(fut.invalidPartitions())) {
@@ -292,9 +292,11 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
      */
     private boolean map(KeyCacheObject key) {
         try {
+            int keyPart = cctx.affinity().partition(key);
+
             GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
-                cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
-                cache().topology().localPartition(key, false);
+                cache().topology().localPartition(keyPart, topVer, true) :
+                cache().topology().localPartition(keyPart);
 
             if (part == null)
                 return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 2da26ac..b9007ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -87,8 +87,8 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
     /** Topology version .*/
     private AffinityTopologyVersion topVer;
 
-    /** Retries because ownership changed. */
-    private Collection<Integer> retries;
+    /** Retry because ownership changed. */
+    private Integer retry;
 
     /** Subject ID. */
     private UUID subjId;
@@ -194,17 +194,21 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
      *
      */
     private void map() {
-        if (cctx.dht().dhtPreloader().needForceKeys()) {
-            GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(
+        if (cctx.group().preloader().needForceKeys()) {
+            GridDhtFuture<Object> fut = cctx.group().preloader().request(
+                cctx,
                 Collections.singleton(key),
                 topVer);
 
             if (fut != null) {
-                if (F.isEmpty(fut.invalidPartitions())) {
-                    if (retries == null)
-                        retries = new HashSet<>();
+                if (!F.isEmpty(fut.invalidPartitions())) {
+                    assert fut.invalidPartitions().size() == 1 : fut.invalidPartitions();
 
-                    retries.addAll(fut.invalidPartitions());
+                    retry = F.first(fut.invalidPartitions());
+
+                    onDone((GridCacheEntryInfo)null);
+
+                    return;
                 }
 
                 fut.listen(
@@ -239,17 +243,14 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
      *
      */
     private void map0() {
-        // Assign keys to primary nodes.
-        int part = cctx.affinity().partition(key);
+        assert retry == null : retry;
 
-        if (retries == null || !retries.contains(part)) {
-            if (!map(key)) {
-                retries = Collections.singleton(part);
+        if (!map(key)) {
+            retry = cctx.affinity().partition(key);
 
-                onDone((GridCacheEntryInfo)null);
+            onDone((GridCacheEntryInfo)null);
 
-                return;
-            }
+            return;
         }
 
         getAsync();
@@ -257,7 +258,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
 
     /** {@inheritDoc} */
     @Override public Collection<Integer> invalidPartitions() {
-        return retries == null ? Collections.<Integer>emptyList() : retries;
+        return retry == null ? Collections.<Integer>emptyList() : Collections.singletonList(retry);
     }
 
     /**
@@ -265,9 +266,11 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
      * @return {@code True} if mapped.
      */
     private boolean map(KeyCacheObject key) {
+        int keyPart = cctx.affinity().partition(key);
+
         GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
-            cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
-            cache().topology().localPartition(key, false);
+            cache().topology().localPartition(keyPart, topVer, true) :
+            cache().topology().localPartition(keyPart);
 
         if (part == null)
             return false;


Mime
View raw message