ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [11/18] ignite git commit: ignite-5075 'logical' caches sharing the same 'physical' cache group
Date Sun, 04 Jun 2017 08:03:10 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 955ca69..a25d794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.cache.database;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.DataPageEvictionMode;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
 import org.apache.ignite.internal.processors.cache.IncompleteObject;
@@ -94,25 +93,26 @@ public class CacheDataRowAdapter implements CacheDataRow {
     /**
      * Read row from data pages.
      *
-     * @param cctx Cache context.
+     * @param grp Cache group.
      * @param rowData Required row data.
      * @throws IgniteCheckedException If failed.
      */
-    public final void initFromLink(GridCacheContext<?, ?> cctx, RowData rowData) throws IgniteCheckedException {
-        initFromLink(cctx, cctx.shared(), cctx.memoryPolicy().pageMemory(), rowData);
+    public final void initFromLink(CacheGroupContext grp, RowData rowData) throws IgniteCheckedException {
+        initFromLink(grp, grp.shared(), grp.memoryPolicy().pageMemory(), rowData);
     }
 
     /**
      * Read row from data pages.
      * Can be called with cctx == null, if cache instance is unknown, but its ID is stored in the data row.
      *
-     * @param cctx Cctx.
+     * @param grp Cache group.
      * @param sharedCtx Shared context.
      * @param pageMem Page memory.
      * @param rowData Row data.
+     * @throws IgniteCheckedException If failed.
      */
     public final void initFromLink(
-        @Nullable GridCacheContext<?, ?> cctx,
+        @Nullable CacheGroupContext grp,
         GridCacheSharedContext<?, ?> sharedCtx,
         PageMemory pageMem,
         RowData rowData)
@@ -120,14 +120,9 @@ public class CacheDataRowAdapter implements CacheDataRow {
         assert link != 0 : "link";
         assert key == null : "key";
 
-        CacheObjectContext coctx = null;
-
-        if (cctx != null) {
-            cacheId = cctx.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ?
-                cctx.cacheId() : 0; // Force cacheId reading for evictable memory policies.
+        CacheObjectContext coctx = grp != null ?  grp.cacheObjectContext() : null;
 
-            coctx = cctx.cacheObjectContext();
-        }
+        boolean readCacheId = grp == null || grp.storeCacheIdInDataPage();
 
         long nextLink = link;
         IncompleteObject<?> incomplete = null;
@@ -135,10 +130,16 @@ public class CacheDataRowAdapter implements CacheDataRow {
 
         do {
             final long pageId = pageId(nextLink);
-            final long page = pageMem.acquirePage(cacheId, pageId);
+
+            // Group is null if try evict page, with persistence evictions should be disabled.
+            assert grp != null || !sharedCtx.database().persistenceEnabled();
+
+            int grpId = grp != null ? grp.groupId() : 0;
+
+            final long page = pageMem.acquirePage(grpId, pageId);
 
             try {
-                long pageAddr = pageMem.readLock(cacheId, pageId, page); // Non-empty data page must not be recycled.
+                long pageAddr = pageMem.readLock(grpId, pageId, page); // Non-empty data page must not be recycled.
 
                 assert pageAddr != 0L : nextLink;
 
@@ -154,7 +155,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
                     if (first) {
                         if (nextLink == 0) {
                             // Fast path for a single page row.
-                            readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData);
+                            readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData, readCacheId);
 
                             return;
                         }
@@ -169,17 +170,17 @@ public class CacheDataRowAdapter implements CacheDataRow {
 
                     boolean keyOnly = rowData == RowData.KEY_ONLY;
 
-                    incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, incomplete);
+                    incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, readCacheId, incomplete);
 
                     if (keyOnly && key != null)
                         return;
                 }
                 finally {
-                    pageMem.readUnlock(cacheId, pageId, page);
+                    pageMem.readUnlock(grpId, pageId, page);
                 }
             }
             finally {
-                pageMem.releasePage(cacheId, pageId, page);
+                pageMem.releasePage(grpId, pageId, page);
             }
         }
         while(nextLink != 0);
@@ -188,9 +189,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
     }
 
     /**
+     * @param sharedCtx Cache shared context.
      * @param coctx Cache object context.
      * @param buf Buffer.
      * @param keyOnly {@code true} If need to read only key object.
+     * @param readCacheId {@code true} If need to read cache ID.
      * @param incomplete Incomplete object.
      * @throws IgniteCheckedException If failed.
      * @return Read object.
@@ -200,9 +203,10 @@ public class CacheDataRowAdapter implements CacheDataRow {
         CacheObjectContext coctx,
         ByteBuffer buf,
         boolean keyOnly,
+        boolean readCacheId,
         IncompleteObject<?> incomplete
     ) throws IgniteCheckedException {
-        if (cacheId == 0) {
+        if (readCacheId && cacheId == 0) {
             incomplete = readIncompleteCacheId(buf, incomplete);
 
             if (cacheId == 0)
@@ -211,8 +215,13 @@ public class CacheDataRowAdapter implements CacheDataRow {
             incomplete = null;
         }
 
-        if (coctx == null)
+        if (coctx == null) {
+            // coctx can be null only when grp is null too, this means that
+            // we are in process of eviction and cacheId is mandatory part of data.
+            assert cacheId != 0;
+
             coctx = sharedCtx.cacheContext(cacheId).cacheObjectContext();
+        }
 
         // Read key.
         if (key == null) {
@@ -251,20 +260,23 @@ public class CacheDataRowAdapter implements CacheDataRow {
     }
 
     /**
+     * @param sharedCtx Cache shared context.
      * @param coctx Cache object context.
      * @param addr Address.
      * @param rowData Required row data.
+     * @param readCacheId {@code true} If need to read cache ID.
      * @throws IgniteCheckedException If failed.
      */
     private void readFullRow(
         GridCacheSharedContext<?, ?> sharedCtx,
         CacheObjectContext coctx,
         long addr,
-        RowData rowData)
+        RowData rowData,
+        boolean readCacheId)
         throws IgniteCheckedException {
         int off = 0;
 
-        if (cacheId == 0) {
+        if (readCacheId) {
             cacheId = PageUtils.getInt(addr, off);
 
             off += 4;
@@ -328,6 +340,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
             if (remaining >= size) {
                 cacheId = buf.getInt();
 
+                assert cacheId != 0;
+
                 return null;
             }
 
@@ -342,6 +356,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
             timeBuf.order(buf.order());
 
             cacheId = timeBuf.getInt();
+
+            assert cacheId != 0;
         }
 
         return incomplete;
@@ -401,7 +417,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
      * @param buf Buffer.
      * @param incomplete Incomplete object.
      * @return Incomplete object.
-     * @throws IgniteCheckedException If failed.
      */
     private IncompleteObject<?> readIncompleteExpireTime(
         ByteBuffer buf,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
index d51cf0e..6e429c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
@@ -37,4 +37,9 @@ public interface CacheSearchRow {
      * @return Key hash code.
      */
     public int hash();
+
+    /**
+     * @return Cache ID or {@code 0} if cache ID is not defined.
+     */
+    public int cacheId();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
index fd5e2a2..19c25aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider;
 import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
@@ -730,9 +731,9 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
     }
 
     /**
-     * @param stoppedCtxs A collection of tuples (cache context, destroy flag).
+     * @param stoppedGrps A collection of tuples (cache group, destroy flag).
      */
-    public void onCachesStopped(Collection<IgniteBiTuple<GridCacheContext, Boolean>> stoppedCtxs) {
+    public void onCacheGroupsStopped(Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps) {
         // No-op.
     }
 
@@ -768,12 +769,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
 
     /**
      * Reserve update history for preloading.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param partId Partition Id.
      * @param cntr Update counter.
      * @return True if successfully reserved.
      */
-    public boolean reserveHistoryForPreloading(int cacheId, int partId, long cntr) {
+    public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) {
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java
index 5b87cf7..91957db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheSnapshotManager.java
@@ -64,49 +64,49 @@ public class IgniteCacheSnapshotManager extends GridCacheSharedManagerAdapter {
      *
      */
     public void restoreState() throws IgniteCheckedException {
-
+        // No-op.
     }
 
     /**
      *
      */
     public void onCheckPointBegin() {
-
+        // No-op.
     }
 
     /**
      *
      */
     public void beforeCheckpointPageWritten() {
-
+        // No-op.
     }
 
     /**
      *
      */
     public void afterCheckpointPageWritten() {
-
+        // No-op.
     }
 
     /**
      * @param fullId Full id.
      */
     public void beforePageWrite(FullPageId fullId) {
-
+        // No-op.
     }
 
     /**
      * @param fullId Full id.
      */
     public void onPageWrite(FullPageId fullId, ByteBuffer tmpWriteBuf) {
-
+        // No-op.
     }
 
     /**
      * @param cctx Cctx.
      */
     public void onCacheStop(GridCacheContext cctx) {
-
+        // No-op.
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java
index 91fed4c..c21b818 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetaStore.java
@@ -29,7 +29,7 @@ public interface MetaStore {
      * @param idxName Index name.
      * @return {@link RootPage} that keeps pageId, allocated flag that shows whether the page
      *      was newly allocated, and rootId that is counter which increments each time new page allocated.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException If failed.
      */
     public RootPage getOrAllocateForTree(String idxName) throws IgniteCheckedException;
 
@@ -38,14 +38,14 @@ public interface MetaStore {
      *
      * @param idxName Index name.
      * @return Root ID or -1 if no page was removed.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException  If failed.
      */
     public RootPage dropRootPage(String idxName) throws IgniteCheckedException;
 
     /**
      * Destroy this meta store.
      *
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException  If failed.
      */
     public void destroy() throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
index ca4ad05..139bf73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
@@ -53,8 +53,8 @@ public class MetadataStorage implements MetaStore {
     /** Meta page reuse tree. */
     private final ReuseList reuseList;
 
-    /** Cache ID. */
-    private final int cacheId;
+    /** Cache group ID. */
+    private final int grpId;
 
     /** */
     private final int allocPartId;
@@ -70,7 +70,7 @@ public class MetadataStorage implements MetaStore {
         final PageMemory pageMem,
         final IgniteWriteAheadLogManager wal,
         final AtomicLong globalRmvId,
-        final int cacheId,
+        final int grpId,
         final int allocPartId,
         final byte allocSpace,
         final ReuseList reuseList,
@@ -79,12 +79,12 @@ public class MetadataStorage implements MetaStore {
     ) {
         try {
             this.pageMem = pageMem;
-            this.cacheId = cacheId;
+            this.grpId = grpId;
             this.allocPartId = allocPartId;
             this.allocSpace = allocSpace;
             this.reuseList = reuseList;
 
-            metaTree = new MetaTree(cacheId, allocPartId, allocSpace, pageMem, wal, globalRmvId, rootPageId,
+            metaTree = new MetaTree(grpId, allocPartId, allocSpace, pageMem, wal, globalRmvId, rootPageId,
                 reuseList, MetaStoreInnerIO.VERSIONS, MetaStoreLeafIO.VERSIONS, initNew);
         }
         catch (IgniteCheckedException e) {
@@ -111,14 +111,14 @@ public class MetadataStorage implements MetaStore {
                 if (reuseList != null)
                     pageId = reuseList.takeRecycledPage();
 
-                pageId = pageId == 0 ? pageMem.allocatePage(cacheId, allocPartId, allocSpace) : pageId;
+                pageId = pageId == 0 ? pageMem.allocatePage(grpId, allocPartId, allocSpace) : pageId;
 
                 tree.put(new IndexItem(idxNameBytes, pageId));
 
-                return new RootPage(new FullPageId(pageId, cacheId), true);
+                return new RootPage(new FullPageId(pageId, grpId), true);
             }
             else {
-                final FullPageId pageId = new FullPageId(row.pageId, cacheId);
+                final FullPageId pageId = new FullPageId(row.pageId, grpId);
 
                 return new RootPage(pageId, false);
             }
@@ -134,10 +134,10 @@ public class MetadataStorage implements MetaStore {
 
         if (row != null) {
             if (reuseList == null)
-                pageMem.freePage(cacheId, row.pageId);
+                pageMem.freePage(grpId, row.pageId);
         }
 
-        return row != null ? new RootPage(new FullPageId(row.pageId, cacheId), false) : null;
+        return row != null ? new RootPage(new FullPageId(row.pageId, grpId), false) : null;
     }
 
     /** {@inheritDoc} */
@@ -288,7 +288,7 @@ public class MetadataStorage implements MetaStore {
         PageUtils.putByte(dstPageAddr, dstOff, len);
         dstOff++;
 
-        PageHandler.copyMemory(srcPageAddr, srcOff, dstPageAddr, dstOff, len);
+        PageHandler.copyMemory(srcPageAddr, dstPageAddr, srcOff, dstOff, len);
         srcOff += len;
         dstOff += len;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
index 563902b..d707869 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
@@ -19,8 +19,9 @@ package org.apache.ignite.internal.processors.cache.database;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
 
 /**
@@ -31,27 +32,29 @@ public class RowStore {
     private final FreeList freeList;
 
     /** */
-    protected final PageMemory pageMem;
+    private final GridCacheSharedContext ctx;
 
     /** */
-    protected final GridCacheContext<?,?> cctx;
+    protected final PageMemory pageMem;
 
     /** */
     protected final CacheObjectContext coctx;
 
+
+
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      * @param freeList Free list.
      */
-    public RowStore(GridCacheContext<?,?> cctx, FreeList freeList) {
-        assert cctx != null;
+    public RowStore(CacheGroupContext grp, FreeList freeList) {
+        assert grp != null;
         assert freeList != null;
 
-        this.cctx = cctx;
         this.freeList = freeList;
 
-        coctx = cctx.cacheObjectContext();
-        pageMem = cctx.memoryPolicy().pageMemory();
+        ctx = grp.shared();
+        coctx = grp.cacheObjectContext();
+        pageMem = grp.memoryPolicy().pageMemory();
     }
 
     /**
@@ -60,13 +63,13 @@ public class RowStore {
      */
     public void removeRow(long link) throws IgniteCheckedException {
         assert link != 0;
-        cctx.shared().database().checkpointReadLock();
+        ctx.database().checkpointReadLock();
 
         try {
             freeList.removeDataRowByLink(link);
         }
         finally {
-            cctx.shared().database().checkpointReadUnlock();
+            ctx.database().checkpointReadUnlock();
         }
     }
 
@@ -75,13 +78,13 @@ public class RowStore {
      * @throws IgniteCheckedException If failed.
      */
     public void addRow(CacheDataRow row) throws IgniteCheckedException {
-        cctx.shared().database().checkpointReadLock();
+        ctx.database().checkpointReadLock();
 
         try {
             freeList.insertDataRow(row);
         }
         finally {
-            cctx.shared().database().checkpointReadUnlock();
+            ctx.database().checkpointReadUnlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
index 2465d3f..d92f811 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
@@ -884,11 +884,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
     /**
      * @param upper Upper bound.
+     * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
-    private GridCursor<T> findLowerUnbounded(L upper) throws IgniteCheckedException {
-        ForwardCursor cursor = new ForwardCursor(null, upper);
+    private GridCursor<T> findLowerUnbounded(L upper, Object x) throws IgniteCheckedException {
+        ForwardCursor cursor = new ForwardCursor(null, upper, x);
 
         long firstPageId;
 
@@ -933,14 +934,25 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
-    @Override public final GridCursor<T> find(L lower, L upper) throws IgniteCheckedException {
+    @Override public GridCursor<T> find(L lower, L upper) throws IgniteCheckedException {
+        return find(lower, upper, null);
+    }
+
+    /**
+     * @param lower Lower bound inclusive or {@code null} if unbounded.
+     * @param upper Upper bound inclusive or {@code null} if unbounded.
+     * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+     * @return Cursor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final GridCursor<T> find(L lower, L upper, Object x) throws IgniteCheckedException {
         checkDestroyed();
 
         try {
             if (lower == null)
-                return findLowerUnbounded(upper);
+                return findLowerUnbounded(upper, x);
 
-            ForwardCursor cursor = new ForwardCursor(lower, upper);
+            ForwardCursor cursor = new ForwardCursor(lower, upper, x);
 
             cursor.find();
 
@@ -4392,6 +4404,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         /** */
         private final L upperBound;
 
+        /** */
+        private final Object x;
+
         /**
          * @param lowerBound Lower bound.
          * @param upperBound Upper bound.
@@ -4399,6 +4414,18 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         ForwardCursor(L lowerBound, L upperBound) {
             this.lowerBound = lowerBound;
             this.upperBound = upperBound;
+            this.x = null;
+        }
+
+        /**
+         * @param lowerBound Lower bound.
+         * @param upperBound Upper bound.
+         * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+         */
+        ForwardCursor(L lowerBound, L upperBound, Object x) {
+            this.lowerBound = lowerBound;
+            this.upperBound = upperBound;
+            this.x = x;
         }
 
         /**
@@ -4515,7 +4542,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                 rows = (T[])new Object[cnt];
 
             for (int i = 0; i < cnt; i++) {
-                T r = getRow(io, pageAddr, startIdx + i);
+                T r = getRow(io, pageAddr, startIdx + i, x);
 
                 rows = GridArrays.set(rows, i, r);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
index 2586696..e40ed11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
@@ -157,6 +157,21 @@ public abstract class PageIO {
     /** */
     public static final short T_PAGE_UPDATE_TRACKING = 15;
 
+    /** */
+    public static final short T_CACHE_ID_AWARE_DATA_REF_INNER = 16;
+
+    /** */
+    public static final short T_CACHE_ID_AWARE_DATA_REF_LEAF = 17;
+
+    /** */
+    public static final short T_CACHE_ID_AWARE_PENDING_REF_INNER = 18;
+
+    /** */
+    public static final short T_CACHE_ID_AWARE_PENDING_REF_LEAF = 19;
+
+    /** */
+    public static final short T_PART_CNTRS = 20;
+
     /** Index for payload == 1. */
     public static final short T_H2_EX_REF_LEAF_START = 10000;
 
@@ -430,6 +445,9 @@ public abstract class PageIO {
             case T_PART_META:
                 return (Q)PagePartitionMetaIO.VERSIONS.forVersion(ver);
 
+            case T_PART_CNTRS:
+                return (Q)PagePartitionCountersIO.VERSIONS.forVersion(ver);
+
             case T_PAGE_UPDATE_TRACKING:
                 return (Q)TrackingPageIO.VERSIONS.forVersion(ver);
 
@@ -484,6 +502,12 @@ public abstract class PageIO {
             case T_DATA_REF_LEAF:
                 return (Q)IgniteCacheOffheapManagerImpl.DataLeafIO.VERSIONS.forVersion(ver);
 
+            case T_CACHE_ID_AWARE_DATA_REF_INNER:
+                return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwareDataInnerIO.VERSIONS.forVersion(ver);
+
+            case T_CACHE_ID_AWARE_DATA_REF_LEAF:
+                return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwareDataLeafIO.VERSIONS.forVersion(ver);
+
             case T_METASTORE_INNER:
                 return (Q)MetadataStorage.MetaStoreInnerIO.VERSIONS.forVersion(ver);
 
@@ -496,6 +520,12 @@ public abstract class PageIO {
             case T_PENDING_REF_LEAF:
                 return (Q)IgniteCacheOffheapManagerImpl.PendingEntryLeafIO.VERSIONS.forVersion(ver);
 
+            case T_CACHE_ID_AWARE_PENDING_REF_INNER:
+                return (Q) IgniteCacheOffheapManagerImpl.CacheIdAwarePendingEntryInnerIO.VERSIONS.forVersion(ver);
+
+            case T_CACHE_ID_AWARE_PENDING_REF_LEAF:
+                return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwarePendingEntryLeafIO.VERSIONS.forVersion(ver);
+
             default:
                 // For tests.
                 if (innerTestIO != null && innerTestIO.getType() == type && innerTestIO.getVersion() == ver)

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java
new file mode 100644
index 0000000..015b8ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionCountersIO.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.database.tree.io;
+
+import java.util.Map;
+import org.apache.ignite.internal.pagemem.PageUtils;
+
+/**
+ *
+ */
+public class PagePartitionCountersIO extends PageIO {
+    /** */
+    private static final int CNT_OFF = COMMON_HEADER_END;
+
+    /** */
+    private static final int LAST_FLAG_OFF = CNT_OFF + 2;
+
+    /** */
+    private static final int NEXT_COUNTERS_PAGE_OFF = LAST_FLAG_OFF + 1;
+
+    /** */
+    private static final int ITEMS_OFF = NEXT_COUNTERS_PAGE_OFF + 8;
+
+    /** */
+    private static final int ITEM_SIZE = 12;
+
+    /** */
+    private static final byte LAST_FLAG = 0b1;
+
+    /** */
+    public static final IOVersions<PagePartitionCountersIO> VERSIONS = new IOVersions<>(
+        new PagePartitionCountersIO(1)
+    );
+
+    /**
+     * @param ver Page format version.
+     */
+    public PagePartitionCountersIO(int ver) {
+        super(T_PART_CNTRS, ver);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initNewPage(long pageAddr, long pageId, int pageSize) {
+        super.initNewPage(pageAddr, pageId, pageSize);
+
+        setCount(pageAddr, 0);
+        setNextCountersPageId(pageAddr, 0);
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @return Next counters page ID or {@code 0} if it does not exist.
+     */
+    public long getNextCountersPageId(long pageAddr) {
+        return PageUtils.getLong(pageAddr, NEXT_COUNTERS_PAGE_OFF);
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @param partMetaPageId Next counters page ID.
+     */
+    public void setNextCountersPageId(long pageAddr, long partMetaPageId) {
+        PageUtils.putLong(pageAddr, NEXT_COUNTERS_PAGE_OFF, partMetaPageId);
+    }
+
+    /**
+     * @param pageSize Page size.
+     * @param pageAddr Page address.
+     * @param cacheSizes Serialized cache size items (pairs of cache ID and its size).
+     * @return Number of written pairs.
+     */
+    public int writeCacheSizes(int pageSize, long pageAddr, byte[] cacheSizes, int itemsOff) {
+        assert cacheSizes != null;
+        assert cacheSizes.length % ITEM_SIZE == 0 : cacheSizes.length;
+
+        int cap = getCapacity(pageSize);
+
+        int items = (cacheSizes.length / ITEM_SIZE) - itemsOff;
+        int write = Math.min(cap, items);
+
+        if (write > 0)
+            // This can happen in case there are no items in a given partition for all caches in the group.
+            PageUtils.putBytes(pageAddr, ITEMS_OFF, cacheSizes, itemsOff * ITEM_SIZE, write * ITEM_SIZE);
+
+        setCount(pageAddr, write);
+
+        setLastFlag(pageAddr, write == items);
+
+        return write;
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @param res Result map of cache sizes.
+     * @return {@code True} if the map was fully read.
+     */
+    public boolean readCacheSizes(long pageAddr, Map<Integer, Long> res) {
+        int cnt = getCount(pageAddr);
+
+        assert cnt >= 0 && cnt <= Short.MAX_VALUE : cnt;
+
+        if (cnt == 0)
+            return true;
+
+        int off = ITEMS_OFF;
+
+        for (int i = 0; i < cnt; i++) {
+            int cacheId = PageUtils.getInt(pageAddr, off);
+            off += 4;
+
+            assert cacheId != 0;
+
+            long cacheSize = PageUtils.getLong(pageAddr, off);
+            off += 8;
+
+            assert cacheSize > 0 : cacheSize;
+
+            Long old = res.put(cacheId, cacheSize);
+
+            assert old == null;
+        }
+
+        return getLastFlag(pageAddr);
+    }
+
+    private boolean getLastFlag(long pageAddr) {
+        return PageUtils.getByte(pageAddr, LAST_FLAG_OFF) == LAST_FLAG;
+    }
+
+    private void setLastFlag(long pageAddr, boolean last) {
+        PageUtils.putByte(pageAddr, LAST_FLAG_OFF, last ? LAST_FLAG : ~LAST_FLAG);
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @return Stored items count.
+     */
+    private int getCount(long pageAddr) {
+        return PageUtils.getShort(pageAddr, CNT_OFF);
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @param cnt Stored items count.
+     */
+    private void setCount(long pageAddr, int cnt) {
+        assert cnt >= 0 && cnt <= Short.MAX_VALUE : cnt;
+
+        PageUtils.putShort(pageAddr, CNT_OFF, (short)cnt);
+    }
+
+    /**
+     * @param pageSize Page size.
+     * @return Maximum number of items which can be stored in buffer.
+     */
+    private int getCapacity(int pageSize) {
+        return (pageSize - ITEMS_OFF) / ITEM_SIZE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java
index aca0725..67cc5a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java
@@ -37,6 +37,9 @@ public class PagePartitionMetaIO extends PageMetaIO {
     private static final int PARTITION_STATE_OFF = GLOBAL_RMV_ID_OFF + 8;
 
     /** */
+    private static final int NEXT_PART_META_PAGE_OFF = PARTITION_STATE_OFF + 1;
+
+    /** */
     public static final IOVersions<PagePartitionMetaIO> VERSIONS = new IOVersions<>(
         new PagePartitionMetaIO(1)
     );
@@ -49,6 +52,7 @@ public class PagePartitionMetaIO extends PageMetaIO {
         setUpdateCounter(pageAddr, 0);
         setGlobalRemoveId(pageAddr, 0);
         setPartitionState(pageAddr, (byte)-1);
+        setCountersPageId(pageAddr, 0);
     }
 
     /**
@@ -120,4 +124,20 @@ public class PagePartitionMetaIO extends PageMetaIO {
     public void setPartitionState(long pageAddr, byte state) {
         PageUtils.putByte(pageAddr, PARTITION_STATE_OFF, state);
     }
+
+    /**
+     * @param pageAddr Page address.
+     * @return Next meta partial page ID or {@code 0} if it does not exist.
+     */
+    public long getCountersPageId(long pageAddr) {
+        return PageUtils.getLong(pageAddr, NEXT_PART_META_PAGE_OFF);
+    }
+
+    /**
+     * @param pageAddr Page address.
+     * @param metaPageId Next partial meta page ID.
+     */
+    public void setCountersPageId(long pageAddr, long metaPageId) {
+        PageUtils.putLong(pageAddr, NEXT_PART_META_PAGE_OFF, metaPageId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index 5d1885e..c092132 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -38,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  *
  */
-public class GridCacheTtlUpdateRequest extends GridCacheMessage {
+public class GridCacheTtlUpdateRequest extends GridCacheIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index 630c79f..fc209aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@ -24,7 +24,7 @@ import java.util.Collections;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -37,7 +37,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Base for all messages in replicated cache.
  */
-public abstract class GridDistributedBaseMessage extends GridCacheMessage implements GridCacheDeployable,
+public abstract class GridDistributedBaseMessage extends GridCacheIdMessage implements GridCacheDeployable,
     GridCacheVersionable {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index c966877..dc9e4ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -79,10 +79,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
     /**
      * @param ctx Cache registry.
-     * @param startSize Start size.
      */
-    protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx, int startSize) {
-        super(ctx, startSize);
+    protected GridDistributedCacheAdapter(GridCacheContext<K, V> ctx) {
+        super(ctx);
     }
 
     /**
@@ -279,11 +278,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
             IgniteCacheOffheapManager offheap = ctx.offheap();
 
             if (modes.offheap)
-                size += offheap.entriesCount(modes.primary, modes.backup, topVer);
+                size += offheap.cacheEntriesCount(ctx.cacheId(), modes.primary, modes.backup, topVer);
             else if (modes.heap) {
                 for (GridDhtLocalPartition locPart : ctx.topology().currentLocalPartitions()) {
                     if ((modes.primary && locPart.primary(topVer)) || (modes.backup && locPart.backup(topVer)))
-                        size += locPart.publicSize();
+                        size += locPart.publicSize(ctx.cacheId());
                 }
             }
         }
@@ -308,7 +307,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
             if (ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer) && modes.primary ||
                 ctx.affinity().backupByPartition(ctx.localNode(), part, topVer) && modes.backup)
-                size += offheap.entriesCount(part);
+                size += offheap.cacheEntriesCount(ctx.cacheId(), part);
         }
 
         return size;
@@ -460,7 +459,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                             return false;
 
                         try {
-                            GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().keysIterator(part);
+                            GridCloseableIterator<KeyCacheObject> iter = dht.context().offheap().cacheKeysIterator(ctx.cacheId(), part);
 
                             if (iter != null) {
                                 try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 561c292..c36e633 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -71,6 +71,16 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public int handlerId() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public final int partition() {
         return part;
     }
@@ -135,25 +145,25 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
         }
 
         switch (writer.state()) {
-            case 3:
+            case 2:
                 if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
-            case 4:
+            case 3:
                 if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 5:
+            case 4:
                 if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 5:
                 if (!writer.writeMessage("txId", txId))
                     return false;
 
@@ -175,7 +185,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
             return false;
 
         switch (reader.state()) {
-            case 3:
+            case 2:
                 flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
@@ -183,7 +193,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 4:
+            case 3:
                 futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
@@ -191,7 +201,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 5:
+            case 4:
                 part = reader.readInt("part");
 
                 if (!reader.isLastRead())
@@ -199,7 +209,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
                 reader.incrementState();
 
-            case 6:
+            case 5:
                 txId = reader.readMessage("txId");
 
                 if (!reader.isLastRead())
@@ -219,7 +229,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 6;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 714d781..5e3020d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -594,6 +594,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                         dhtVer,
                                                         txEntry.updateCounter());
                                                 else {
+                                                    assert val != null : txEntry;
+
                                                     cached.innerSet(this,
                                                         eventNodeId(),
                                                         nodeId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
index 76c7a15..3b41ffa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
@@ -25,7 +25,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -38,24 +38,25 @@ import org.jetbrains.annotations.Nullable;
  * An implementation of GridCacheConcurrentMap that will delegate all method calls to corresponding local partition.
  */
 public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap {
-    /** Context. */
-    private final GridCacheContext ctx;
+    /** Cache group. */
+    private final CacheGroupContext grp;
 
     /**
-     * Constructor.
-     * @param ctx Context.
+     * @param grp Cache group.
      */
-    public GridCachePartitionedConcurrentMap(GridCacheContext ctx) {
-        this.ctx = ctx;
+    GridCachePartitionedConcurrentMap(CacheGroupContext grp) {
+        this.grp = grp;
     }
 
     /**
+     * @param cctx Cache context.
      * @param key Key.
      * @param topVer Topology version.
      * @param create Create flag.
      * @return Local partition.
      */
     @Nullable private GridDhtLocalPartition localPartition(
+        GridCacheContext cctx,
         KeyCacheObject key,
         AffinityTopologyVersion topVer,
         boolean create
@@ -63,33 +64,33 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
         int p = key.partition();
 
         if (p == -1)
-            p = ctx.affinity().partition(key);
+            p = cctx.affinity().partition(key);
 
-        return ctx.topology().localPartition(p, topVer, create);
+        return grp.topology().localPartition(p, topVer, create);
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) {
-        GridDhtLocalPartition part = localPartition(key, AffinityTopologyVersion.NONE, false);
+    @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) {
+        GridDhtLocalPartition part = localPartition(ctx, key, AffinityTopologyVersion.NONE, false);
 
         if (part == null)
             return null;
 
-        return part.getEntry(key);
+        return part.getEntry(ctx, key);
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer,
+    @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(GridCacheContext ctx, AffinityTopologyVersion topVer,
         KeyCacheObject key,
         boolean create,
         boolean touch) {
         while (true) {
-            GridDhtLocalPartition part = localPartition(key, topVer, create);
+            GridDhtLocalPartition part = localPartition(ctx, key, topVer, create);
 
             if (part == null)
                 return null;
 
-            GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, create, touch);
+            GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(ctx, topVer, key, create, touch);
 
             if (res != null || !create)
                 return res;
@@ -102,35 +103,35 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
     @Override public int internalSize() {
         int size = 0;
 
-        for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
+        for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
             size += part.internalSize();
 
         return size;
     }
 
     /** {@inheritDoc} */
-    @Override public int publicSize() {
+    @Override public int publicSize(int cacheId) {
         int size = 0;
 
-        for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
-            size += part.publicSize();
+        for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
+            size += part.publicSize(cacheId);
 
         return size;
     }
 
     /** {@inheritDoc} */
-    @Override public void incrementPublicSize(GridCacheEntryEx e) {
-        localPartition(e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e);
+    @Override public void incrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
+        localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(hld, e);
     }
 
     /** {@inheritDoc} */
-    @Override public void decrementPublicSize(GridCacheEntryEx e) {
-        localPartition(e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e);
+    @Override public void decrementPublicSize(CacheMapHolder hld, GridCacheEntryEx e) {
+        localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(hld, e);
     }
 
     /** {@inheritDoc} */
     @Override public boolean removeEntry(GridCacheEntryEx entry) {
-        GridDhtLocalPartition part = localPartition(entry.key(), AffinityTopologyVersion.NONE, false);
+        GridDhtLocalPartition part = localPartition(entry.context(), entry.key(), AffinityTopologyVersion.NONE, false);
 
         if (part == null)
             return false;
@@ -139,12 +140,12 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
     }
 
     /** {@inheritDoc} */
-    @Override public Iterable<GridCacheMapEntry> entries(final CacheEntryPredicate... filter) {
+    @Override public Iterable<GridCacheMapEntry> entries(final int cacheId, final CacheEntryPredicate... filter) {
         return new Iterable<GridCacheMapEntry>() {
             @Override public Iterator<GridCacheMapEntry> iterator() {
                 return new PartitionedIterator<GridCacheMapEntry>() {
                     @Override protected Iterator<GridCacheMapEntry> iterator(GridDhtLocalPartition part) {
-                        return part.entries(filter).iterator();
+                        return part.entries(cacheId, filter).iterator();
                     }
                 };
             }
@@ -152,23 +153,10 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
     }
 
     /** {@inheritDoc} */
-    @Override public Iterable<GridCacheMapEntry> allEntries(final CacheEntryPredicate... filter) {
-        return new Iterable<GridCacheMapEntry>() {
-            @Override public Iterator<GridCacheMapEntry> iterator() {
-                return new PartitionedIterator<GridCacheMapEntry>() {
-                    @Override protected Iterator<GridCacheMapEntry> iterator(GridDhtLocalPartition part) {
-                        return part.allEntries(filter).iterator();
-                    }
-                };
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
-    @Override public Set<GridCacheMapEntry> entrySet(final CacheEntryPredicate... filter) {
+    @Override public Set<GridCacheMapEntry> entrySet(final int cacheId, final CacheEntryPredicate... filter) {
         return new PartitionedSet<GridCacheMapEntry>() {
             @Override protected Set<GridCacheMapEntry> set(GridDhtLocalPartition part) {
-                return part.entrySet(filter);
+                return part.entrySet(cacheId, filter);
             }
         };
     }
@@ -178,7 +166,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
      */
     private abstract class PartitionedIterator<T> implements Iterator<T> {
         /** Partitions iterator. */
-        private Iterator<GridDhtLocalPartition> partsIter = ctx.topology().currentLocalPartitions().iterator();
+        private Iterator<GridDhtLocalPartition> partsIter = grp.topology().currentLocalPartitions().iterator();
 
         /** Current partition iterator. */
         private Iterator<T> currIter = partsIter.hasNext() ? iterator(partsIter.next()) :
@@ -242,7 +230,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
         @Override public int size() {
             int size = 0;
 
-            for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
+            for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
                 size += set(part).size();
 
             return size;
@@ -250,7 +238,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
 
         /** {@inheritDoc} */
         @Override public boolean contains(Object o) {
-            for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions()) {
+            for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
                 if (set(part).contains(o))
                     return true;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 1482137..cace4e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -73,7 +73,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     private GridCacheSharedContext cctx;
 
     /** Cache ID. */
-    private int cacheId;
+    private int grpId;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -113,18 +113,18 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /**
      * @param cctx Context.
-     * @param cacheId Cache ID.
+     * @param grpId Group ID.
      * @param exchFut Exchange ID.
      * @param similarAffKey Key to find caches with similar affinity.
      */
     public GridClientPartitionTopology(
         GridCacheSharedContext cctx,
-        int cacheId,
+        int grpId,
         GridDhtPartitionsExchangeFuture exchFut,
         Object similarAffKey
     ) {
         this.cctx = cctx;
-        this.cacheId = cacheId;
+        this.grpId = grpId;
         this.similarAffKey = similarAffKey;
 
         topVer = exchFut.topologyVersion();
@@ -168,8 +168,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public int cacheId() {
-        return cacheId;
+    @Override public int groupId() {
+        return grpId;
     }
 
     /** {@inheritDoc} */
@@ -283,7 +283,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         long updateSeq = this.updateSeq.incrementAndGet();
 
         // If this is the oldest node.
-        if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheStarted(cacheId)) {
+        if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheGroupStarted(grpId)) {
             if (node2part == null) {
                 node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -361,8 +361,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
-        return localPartition(1, AffinityTopologyVersion.NONE, create);
+    @Override public GridDhtLocalPartition localPartition(int p) {
+        return localPartition(p, AffinityTopologyVersion.NONE, false);
     }
 
     /** {@inheritDoc} */
@@ -550,7 +550,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         lock.readLock().lock();
 
         try {
-            assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
+            if (stopping || node2part == null)
+                return null;
+
+            assert node2part.valid() : "Invalid node2part [node2part: " + node2part +
                 ", locNodeId=" + cctx.localNodeId() +
                 ", igniteInstanceName=" + cctx.igniteInstanceName() + ']';
 
@@ -1035,7 +1038,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() +
-            ", cacheId=" + cacheId + ']');
+            ", grpId=" + grpId + ']');
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
index f80adc5..d9d642a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -27,7 +27,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Affinity assignment request.
  */
-public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
+public class GridDhtAffinityAssignmentRequest extends GridCacheGroupIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -46,17 +46,17 @@ public class GridDhtAffinityAssignmentRequest extends GridCacheMessage {
 
     /**
      * @param futId Future ID.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param topVer Topology version.
      */
     public GridDhtAffinityAssignmentRequest(
         long futId,
-        int cacheId,
+        int grpId,
         AffinityTopologyVersion topVer) {
         assert topVer != null;
 
         this.futId = futId;
-        this.cacheId = cacheId;
+        this.grpId = grpId;
         this.topVer = topVer;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index 5d82171..4df3fc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -27,20 +27,18 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * Affinity assignment response.
  */
-public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
+public class GridDhtAffinityAssignmentResponse extends GridCacheGroupIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -73,17 +71,17 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
 
     /**
      * @param futId Future ID.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment.
      */
     public GridDhtAffinityAssignmentResponse(
         long futId,
-        int cacheId,
+        int grpId,
         @NotNull AffinityTopologyVersion topVer,
         List<List<ClusterNode>> affAssignment) {
         this.futId = futId;
-        this.cacheId = cacheId;
+        this.grpId = grpId;
         this.topVer = topVer;
 
         affAssignmentIds = ids(affAssignment);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 20d1722..8746320 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -74,27 +74,27 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     private final AffinityTopologyVersion topVer;
 
     /** */
-    private final int cacheId;
+    private final int grpId;
 
     /**
      * @param ctx Context.
-     * @param cacheDesc Cache descriptor.
+     * @param grpDesc Group descriptor.
      * @param topVer Topology version.
      * @param discoCache Discovery cache.
      */
     public GridDhtAssignmentFetchFuture(
         GridCacheSharedContext ctx,
-        DynamicCacheDescriptor cacheDesc,
+        CacheGroupDescriptor grpDesc,
         AffinityTopologyVersion topVer,
         DiscoCache discoCache
     ) {
-        this.ctx = ctx;
-        cacheId = cacheDesc.cacheId();
         this.topVer = topVer;
+        this.grpId = grpDesc.groupId();
+        this.ctx = ctx;
 
         id = idGen.getAndIncrement();
 
-        Collection<ClusterNode> availableNodes = discoCache.cacheAffinityNodes(cacheDesc.cacheId());
+        Collection<ClusterNode> availableNodes = discoCache.cacheGroupAffinityNodes(grpDesc.groupId());
 
         LinkedList<ClusterNode> tmp = new LinkedList<>();
 
@@ -112,10 +112,10 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
     }
 
     /**
-     * @return Cache ID.
+     * @return Cache group ID.
      */
-    public int cacheId() {
-        return cacheId;
+    public int groupId() {
+        return grpId;
     }
 
     /**
@@ -195,7 +195,7 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<GridDhtAffin
                             ", node=" + node + ']');
 
                     ctx.io().send(node,
-                        new GridDhtAffinityAssignmentRequest(id, cacheId, topVer),
+                        new GridDhtAffinityAssignmentRequest(id, grpId, topVer),
                         AFFINITY_POOL);
 
                     // Close window for listener notification.


Mime
View raw message