ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [20/31] ignite git commit: ignite-5075 Implement logical 'cache groups' sharing the same physical caches
Date Tue, 06 Jun 2017 10:01:41 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 579796d..d2dc817 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -40,7 +40,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  *
  */
-public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable {
     /** Skip store flag bit mask. */
     private static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 47572fd..443b1b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -69,7 +69,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -192,19 +193,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheMapEntryFactory entryFactory() {
-        return new GridCacheMapEntryFactory() {
-            @Override public GridCacheMapEntry create(
-                GridCacheContext ctx,
-                AffinityTopologyVersion topVer,
-                KeyCacheObject key
-            ) {
-                return new GridDhtAtomicCacheEntry(ctx, topVer, key);
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
     @Override protected void init() {
         super.init();
 
@@ -238,11 +226,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         metrics = m;
 
-        preldr = new GridDhtPreloader(ctx);
-
-        preldr.start();
-
-        ctx.io().addHandler(
+        ctx.io().addCacheHandler(
             ctx.cacheId(),
             GridNearGetRequest.class,
             new CI2<UUID, GridNearGetRequest>() {
@@ -256,7 +240,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(
+        ctx.io().addCacheHandler(
             ctx.cacheId(),
             GridNearSingleGetRequest.class,
             new CI2<UUID, GridNearSingleGetRequest>() {
@@ -270,7 +254,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(
+        ctx.io().addCacheHandler(
             ctx.cacheId(),
             GridNearAtomicAbstractUpdateRequest.class,
             new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() {
@@ -289,7 +273,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
             GridNearAtomicUpdateResponse.class,
             new CI2<UUID, GridNearAtomicUpdateResponse>() {
                 @Override public void apply(
@@ -307,7 +292,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(
+        ctx.io().addCacheHandler(
             ctx.cacheId(),
             GridDhtAtomicAbstractUpdateRequest.class,
             new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() {
@@ -326,7 +311,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(
+        ctx.io().addCacheHandler(
             ctx.cacheId(),
             GridDhtAtomicUpdateResponse.class,
             new CI2<UUID, GridDhtAtomicUpdateResponse>() {
@@ -345,7 +330,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
             GridDhtAtomicDeferredUpdateResponse.class,
             new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() {
                 @Override public void apply(
@@ -363,7 +349,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
             GridDhtAtomicNearResponse.class,
             new CI2<UUID, GridDhtAtomicNearResponse>() {
             @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) {
@@ -376,7 +363,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(),
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
             GridNearAtomicCheckUpdateRequest.class,
             new CI2<UUID, GridNearAtomicCheckUpdateRequest>() {
                 @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) {
@@ -389,8 +377,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
             });
 
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
+            GridDhtForceKeysRequest.class,
+            new MessageHandler<GridDhtForceKeysRequest>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) {
+                    processForceKeysRequest(node, msg);
+                }
+            });
+
+        ctx.io().addCacheHandler(
+            ctx.cacheId(),
+            GridDhtForceKeysResponse.class,
+            new MessageHandler<GridDhtForceKeysResponse>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) {
+                    processForceKeyResponse(node, msg);
+                }
+            });
+
         if (near == null) {
-            ctx.io().addHandler(
+            ctx.io().addCacheHandler(
                 ctx.cacheId(),
                 GridNearGetResponse.class,
                 new CI2<UUID, GridNearGetResponse>() {
@@ -404,7 +410,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
                 });
 
-            ctx.io().addHandler(
+            ctx.io().addCacheHandler(
                 ctx.cacheId(),
                 GridNearSingleGetResponse.class,
                 new CI2<UUID, GridNearSingleGetResponse>() {
@@ -1485,7 +1491,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 // Optimistically expect that all keys are available locally (avoid creation of get future).
                 for (KeyCacheObject key : keys) {
                     if (readNoEntry) {
-                        CacheDataRow row = ctx.offheap().read(key);
+                        CacheDataRow row = ctx.offheap().read(ctx, key);
 
                         if (row != null) {
                             long expireTime = row.expireTime();
@@ -1661,7 +1667,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final GridNearAtomicAbstractUpdateRequest req,
         final UpdateReplyClosure completionCb
     ) {
-        IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
+        IgniteInternalFuture<Object> forceFut = ctx.group().preloader().request(ctx, req, req.topologyVersion());
 
         if (forceFut == null || forceFut.isDone()) {
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
deleted file mode 100644
index b0c9a64..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * DHT atomic cache entry.
- */
-public class GridDhtAtomicCacheEntry extends GridDhtCacheEntry {
-    /**
-     * @param ctx Cache context.
-     * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
-     * @param key Cache key.
-     */
-    GridDhtAtomicCacheEntry(
-        GridCacheContext ctx,
-        AffinityTopologyVersion topVer,
-        KeyCacheObject key
-    ) {
-        super(ctx, topVer, key);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected String cacheName() {
-        return CU.isNearEnabled(cctx) ? super.cacheName() : cctx.dht().name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized String toString() {
-        return S.toString(GridDhtAtomicCacheEntry.class, this, super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index 92ef149..0c069da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridLongList;
@@ -35,7 +35,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Deferred dht atomic update response.
  */
-public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtAtomicDeferredUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index d6e2db0..71d2321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -36,7 +36,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic
 /**
  * Message sent from DHT nodes to near node in FULL_SYNC mode.
  */
-public class GridDhtAtomicNearResponse extends GridCacheMessage {
+public class GridDhtAtomicNearResponse extends GridCacheIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 693d658..7b2547a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
@@ -27,7 +26,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -39,7 +38,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * DHT atomic cache backup update response.
  */
-public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtAtomicUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 4b3ea5bc..bb47af4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -44,7 +44,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  *
  */
-public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMessage implements GridCacheDeployable {
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
index 4b9109e..96be023 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -27,7 +27,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  *
  */
-public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage {
+public class GridNearAtomicCheckUpdateRequest extends GridCacheIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 55953ea..5ba024f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -45,7 +45,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * DHT atomic cache near update response.
  */
-public class GridNearAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 12a3912..708df49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
-import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -116,35 +115,22 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     }
 
     /** {@inheritDoc} */
-    @Override protected GridCacheMapEntryFactory entryFactory() {
-        return new GridCacheMapEntryFactory() {
-            @Override public GridCacheMapEntry create(
-                GridCacheContext ctx,
-                AffinityTopologyVersion topVer,
-                KeyCacheObject key
-            ) {
-                return new GridDhtColocatedCacheEntry(ctx, topVer, key);
-            }
-        };
-    }
-
-    /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearGetResponse res) {
                 processNearGetResponse(nodeId, res);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() {
             @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) {
                 processNearSingleGetResponse(nodeId, res);
             }
         });
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() {
+        ctx.io().addCacheHandler(ctx.cacheId(), GridNearLockResponse.class, new CI2<UUID, GridNearLockResponse>() {
             @Override public void apply(UUID nodeId, GridNearLockResponse res) {
                 processLockResponse(nodeId, res);
             }
@@ -467,7 +453,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
 
                 for (KeyCacheObject key : keys) {
                     if (readNoEntry) {
-                        CacheDataRow row = ctx.offheap().read(key);
+                        CacheDataRow row = ctx.offheap().read(ctx, key);
 
                         if (row != null) {
                             long expireTime = row.expireTime();
@@ -941,7 +927,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     ) {
         assert keys != null;
 
-        IgniteInternalFuture<Object> keyFut = ctx.dht().dhtPreloader().request(keys, topVer);
+        IgniteInternalFuture<Object> keyFut = ctx.group().preloader().request(cacheCtx, keys, topVer);
 
         // Prevent embedded future creation if possible.
         if (keyFut == null || keyFut.isDone()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
deleted file mode 100644
index f7cc5a7..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
-
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Cache entry for colocated cache.
- */
-public class GridDhtColocatedCacheEntry extends GridDhtCacheEntry {
-    /**
-     * @param ctx Cache context.
-     * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
-     * @param key Cache key.
-     */
-    GridDhtColocatedCacheEntry(
-        GridCacheContext ctx,
-        AffinityTopologyVersion topVer,
-        KeyCacheObject key
-    ) {
-        super(ctx, topVer, key);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected String cacheName() {
-        return cctx.colocated().name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized String toString() {
-        return S.toString(GridDhtColocatedCacheEntry.class, this, super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 562a165..a5a1eb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -102,9 +102,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
     /** Future ID. */
     private IgniteUuid futId = IgniteUuid.randomUuid();
 
-    /** Preloader. */
-    private GridDhtPreloader preloader;
-
     /** Trackable flag. */
     private boolean trackable;
 
@@ -112,21 +109,19 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param cctx Cache context.
      * @param topVer Topology version.
      * @param keys Keys.
-     * @param preloader Preloader.
      */
     public GridDhtForceKeysFuture(
         GridCacheContext<K, V> cctx,
         AffinityTopologyVersion topVer,
-        Collection<KeyCacheObject> keys,
-        GridDhtPreloader preloader
+        Collection<KeyCacheObject> keys
     ) {
         assert topVer.topologyVersion() != 0 : topVer;
         assert !F.isEmpty(keys) : keys;
+        assert !cctx.isNear();
 
         this.cctx = cctx;
         this.keys = keys;
         this.topVer = topVer;
-        this.preloader = preloader;
 
         top = cctx.dht().topology();
 
@@ -158,7 +153,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
     @Override public boolean onDone(@Nullable Collection<K> res, @Nullable Throwable err) {
         if (super.onDone(res, err)) {
             if (trackable)
-                preloader.remoteFuture(this);
+                cctx.dht().removeFuture(this);
 
             return true;
         }
@@ -170,7 +165,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param evt Discovery event.
      */
     @SuppressWarnings( {"unchecked"})
-    void onDiscoveryEvent(DiscoveryEvent evt) {
+    public void onDiscoveryEvent(DiscoveryEvent evt) {
         topCntr.incrementAndGet();
 
         int type = evt.type();
@@ -244,7 +239,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
 
             int curTopVer = topCntr.get();
 
-            if (!preloader.addFuture(this)) {
+            if (!cctx.dht().addFuture(this)) {
                 assert isDone() : this;
 
                 return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
index d129ae8..124ae44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
@@ -25,7 +25,7 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -40,7 +40,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  * Force keys request. This message is sent by node while preloading to force
  * another node to put given keys into the next batch of transmitting entries.
  */
-public class GridDhtForceKeysRequest extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtForceKeysRequest extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index c4c57a7..977e9ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -43,7 +43,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Force keys response. Contains absent keys.
  */
-public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtForceKeysResponse extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -168,7 +168,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
 
         if (infos != null) {
             for (GridCacheEntryInfo info : infos)
-                info.marshal(cctx);
+                info.marshal(cctx.cacheObjectContext());
         }
 
         if (err != null && errBytes == null)
@@ -186,7 +186,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
 
         if (infos != null) {
             for (GridCacheEntryInfo info : infos)
-                info.unmarshal(cctx, ldr);
+                info.unmarshal(cctx.cacheObjectContext(), ldr);
         }
 
         if (errBytes != null && err == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 24b1de1..04a7e97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,7 +39,7 @@ import org.jetbrains.annotations.NotNull;
 /**
  * Partition demand request.
  */
-public class GridDhtPartitionDemandMessage extends GridCacheMessage {
+public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -73,10 +73,10 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
     /**
      * @param updateSeq Update sequence for this node.
      * @param topVer Topology version.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      */
-    GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) {
-        this.cacheId = cacheId;
+    GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int grpId) {
+        this.grpId = grpId;
         this.updateSeq = updateSeq;
         this.topVer = topVer;
     }
@@ -86,7 +86,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
      * @param parts Partitions.
      */
     GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts, Map<Integer, Long> partsCntrs) {
-        cacheId = cp.cacheId;
+        grpId = cp.grpId;
         updateSeq = cp.updateSeq;
         topic = cp.topic;
         timeout = cp.timeout;

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index cdbae1a..a1b45df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -38,13 +37,16 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -78,7 +80,10 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
 @SuppressWarnings("NonConstantFieldWithUpperCaseName")
 public class GridDhtPartitionDemander {
     /** */
-    private final GridCacheContext<?, ?> cctx;
+    private final GridCacheSharedContext<?, ?> ctx;
+
+    /** */
+    private final CacheGroupContext grp;
 
     /** */
     private final IgniteLogger log;
@@ -104,30 +109,20 @@ public class GridDhtPartitionDemander {
     private final Map<Integer, Object> rebalanceTopics;
 
     /**
-     * Started event sent.
-     * Make sense for replicated cache only.
+     * @param grp Ccahe group.
      */
-    private final AtomicBoolean startedEvtSent = new AtomicBoolean();
+    public GridDhtPartitionDemander(CacheGroupContext grp) {
+        assert grp != null;
 
-    /**
-     * Stopped event sent.
-     * Make sense for replicated cache only.
-     */
-    private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();
+        this.grp = grp;
 
-    /**
-     * @param cctx Cctx.
-     */
-    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
-        assert cctx != null;
+        ctx = grp.shared();
 
-        this.cctx = cctx;
+        log = ctx.logger(getClass());
 
-        log = cctx.logger(getClass());
+        boolean enabled = grp.rebalanceEnabled() && !ctx.kernalContext().clientNode();
 
-        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
-
-        rebalanceFut = new RebalanceFuture();//Dummy.
+        rebalanceFut = new RebalanceFuture(); //Dummy.
 
         if (!enabled) {
             // Calling onDone() immediately since preloading is disabled.
@@ -137,7 +132,7 @@ public class GridDhtPartitionDemander {
 
         Map<Integer, Object> tops = new HashMap<>();
 
-        for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++)
+        for (int idx = 0; idx < grp.shared().kernalContext().config().getRebalanceThreadPoolSize(); idx++)
             tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
         rebalanceTopics = tops;
@@ -196,7 +191,7 @@ public class GridDhtPartitionDemander {
         GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
 
         if (obj != null)
-            cctx.time().removeTimeoutObject(obj);
+            ctx.time().removeTimeoutObject(obj);
 
         final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
 
@@ -208,7 +203,7 @@ public class GridDhtPartitionDemander {
 
             exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                 @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                    IgniteInternalFuture<Boolean> fut0 = cctx.shared().exchange().forceRebalance(exchFut);
+                    IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut);
 
                     fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
                         @Override public void apply(IgniteInternalFuture<Boolean> future) {
@@ -237,7 +232,7 @@ public class GridDhtPartitionDemander {
      */
     private boolean topologyChanged(RebalanceFuture fut) {
         return
-            !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
+            !grp.affinity().lastVersion().equals(fut.topologyVersion()) || // Topology already changed.
                 fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
     }
 
@@ -268,12 +263,12 @@ public class GridDhtPartitionDemander {
 
         assert force == (forcedRebFut != null);
 
-        long delay = cctx.config().getRebalanceDelay();
+        long delay = grp.config().getRebalanceDelay();
 
         if (delay == 0 || force) {
             final RebalanceFuture oldFut = rebalanceFut;
 
-            final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt);
+            final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, cnt);
 
             if (!oldFut.isInitial())
                 oldFut.cancel();
@@ -319,7 +314,7 @@ public class GridDhtPartitionDemander {
 
                 fut.onDone(true);
 
-                ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+                ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
 
                 fut.sendRebalanceFinishedEvent();
 
@@ -350,7 +345,7 @@ public class GridDhtPartitionDemander {
             GridTimeoutObject obj = lastTimeoutObj.get();
 
             if (obj != null)
-                cctx.time().removeTimeoutObject(obj);
+                ctx.time().removeTimeoutObject(obj);
 
             final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
 
@@ -360,7 +355,7 @@ public class GridDhtPartitionDemander {
                 @Override public void onTimeout() {
                     exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
-                            cctx.shared().exchange().forceRebalance(exchFut);
+                            ctx.exchange().forceRebalance(exchFut);
                         }
                     });
                 }
@@ -368,7 +363,7 @@ public class GridDhtPartitionDemander {
 
             lastTimeoutObj.set(obj);
 
-            cctx.time().addTimeoutObject(obj);
+            ctx.time().addTimeoutObject(obj);
         }
 
         return null;
@@ -399,17 +394,19 @@ public class GridDhtPartitionDemander {
 
                 Collection<Integer> parts= e.getValue().partitions();
 
-                assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]";
+                assert parts != null : "Partitions are null [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + "]";
 
                 fut.remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
             }
         }
 
+        final CacheConfiguration cfg = grp.config();
+
+        int lsnrCnt = ctx.gridConfig().getRebalanceThreadPoolSize();
+
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
             final ClusterNode node = e.getKey();
 
-            final CacheConfiguration cfg = cctx.config();
-
             final Collection<Integer> parts = fut.remaining.get(node.id()).get2();
 
             GridDhtPartitionDemandMessage d = e.getValue();
@@ -418,8 +415,6 @@ public class GridDhtPartitionDemander {
                 ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
                 ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
 
-            int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
-
             final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
             for (int cnt = 0; cnt < lsnrCnt; cnt++)
@@ -439,16 +434,16 @@ public class GridDhtPartitionDemander {
 
                     initD.topic(rebalanceTopics.get(cnt));
                     initD.updateSequence(fut.updateSeq);
-                    initD.timeout(cctx.config().getRebalanceTimeout());
+                    initD.timeout(cfg.getRebalanceTimeout());
 
                     final int finalCnt = cnt;
 
-                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                    ctx.kernalContext().closure().runLocalSafe(new Runnable() {
                         @Override public void run() {
                             try {
                                 if (!fut.isDone()) {
-                                    cctx.io().sendOrderedMessage(node,
-                                        rebalanceTopics.get(finalCnt), initD, cctx.ioPolicy(), initD.timeout());
+                                    ctx.io().sendOrderedMessage(node,
+                                        rebalanceTopics.get(finalCnt), initD, grp.ioPolicy(), initD.timeout());
 
                                     // Cleanup required in case partitions demanded in parallel with cancellation.
                                     synchronized (fut) {
@@ -495,11 +490,11 @@ public class GridDhtPartitionDemander {
 
         for (Integer part : parts) {
             try {
-                if (cctx.shared().database().persistenceEnabled()) {
+                if (ctx.database().persistenceEnabled()) {
                     if (partCntrs == null)
                         partCntrs = new HashMap<>(parts.size(), 1.0f);
 
-                    GridDhtLocalPartition p = cctx.topology().localPartition(part, old.topologyVersion(), false);
+                    GridDhtLocalPartition p = grp.topology().localPartition(part, old.topologyVersion(), false);
 
                     partCntrs.put(part, p.initialUpdateCounter());
                 }
@@ -575,7 +570,7 @@ public class GridDhtPartitionDemander {
 
         final RebalanceFuture fut = rebalanceFut;
 
-        ClusterNode node = cctx.node(id);
+        ClusterNode node = ctx.node(id);
 
         if (node == null)
             return;
@@ -599,14 +594,16 @@ public class GridDhtPartitionDemander {
             return;
         }
 
-        final GridDhtPartitionTopology top = cctx.dht().topology();
+        final GridDhtPartitionTopology top = grp.topology();
 
         try {
+            AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
             // Preload.
             for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
                 int p = e.getKey();
 
-                if (cctx.affinity().partitionLocalNode(p, topVer)) {
+                if (aff.get(p).contains(ctx.localNode())) {
                     GridDhtLocalPartition part = top.localPartition(p, topVer, true);
 
                     assert part != null;
@@ -617,7 +614,7 @@ public class GridDhtPartitionDemander {
                         boolean reserved = part.reserve();
 
                         assert reserved : "Failed to reserve partition [igniteInstanceName=" +
-                            cctx.igniteInstanceName() + ", cacheName=" + cctx.name() + ", part=" + part + ']';
+                            ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']';
 
                         part.lock();
 
@@ -676,7 +673,7 @@ public class GridDhtPartitionDemander {
 
             // Only request partitions based on latest topology version.
             for (Integer miss : supply.missed()) {
-                if (cctx.affinity().partitionLocalNode(miss, topVer))
+                if (aff.get(miss).contains(ctx.localNode()))
                     fut.partitionMissed(id, miss);
             }
 
@@ -684,16 +681,18 @@ public class GridDhtPartitionDemander {
                 fut.partitionDone(id, miss);
 
             GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
-                supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
+                supply.updateSequence(),
+                supply.topologyVersion(),
+                grp.groupId());
 
-            d.timeout(cctx.config().getRebalanceTimeout());
+            d.timeout(grp.config().getRebalanceTimeout());
 
             d.topic(rebalanceTopics.get(idx));
 
             if (!topologyChanged(fut) && !fut.isDone()) {
                 // Send demand message.
-                cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
-                    d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                ctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
+                    d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
             }
         }
         catch (IgniteCheckedException e) {
@@ -722,13 +721,15 @@ public class GridDhtPartitionDemander {
         GridCacheEntryInfo entry,
         AffinityTopologyVersion topVer
     ) throws IgniteCheckedException {
-        cctx.shared().database().checkpointReadLock();
+        ctx.database().checkpointReadLock();
 
         try {
             GridCacheEntryEx cached = null;
 
             try {
-                cached = cctx.dht().entryEx(entry.key());
+                GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(entry.cacheId()) : grp.singleCacheContext();
+
+                cached = cctx.dhtCache().entryEx(entry.key());
 
                 if (log.isDebugEnabled())
                     log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
@@ -779,10 +780,10 @@ public class GridDhtPartitionDemander {
         }
         catch (IgniteCheckedException e) {
             throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
-                cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+                ctx.localNode() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
         }
         finally {
-            cctx.shared().database().checkpointReadUnlock();
+            ctx.database().checkpointReadUnlock();
         }
 
         return true;
@@ -798,16 +799,10 @@ public class GridDhtPartitionDemander {
      */
     public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
         /** */
-        private static final long serialVersionUID = 1L;
-
-        /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */
-        private final AtomicBoolean startedEvtSent;
-
-        /** Should EVT_CACHE_REBALANCE_STOPPED event be sent or not. */
-        private final AtomicBoolean stoppedEvtSent;
+        private final GridCacheSharedContext<?, ?> ctx;
 
         /** */
-        private final GridCacheContext<?, ?> cctx;
+        private final CacheGroupContext grp;
 
         /** */
         private final IgniteLogger log;
@@ -829,40 +824,37 @@ public class GridDhtPartitionDemander {
         private final long updateSeq;
 
         /**
+         * @param grp Cache group.
          * @param assigns Assigns.
-         * @param cctx Context.
          * @param log Logger.
-         * @param startedEvtSent Start event sent flag.
-         * @param stoppedEvtSent Stop event sent flag.
          * @param updateSeq Update sequence.
          */
-        RebalanceFuture(GridDhtPreloaderAssignments assigns,
-            GridCacheContext<?, ?> cctx,
+        RebalanceFuture(
+            CacheGroupContext grp,
+            GridDhtPreloaderAssignments assigns,
             IgniteLogger log,
-            AtomicBoolean startedEvtSent,
-            AtomicBoolean stoppedEvtSent,
             long updateSeq) {
             assert assigns != null;
 
-            this.exchFut = assigns.exchangeFuture();
-            this.topVer = assigns.topologyVersion();
-            this.cctx = cctx;
+            exchFut = assigns.exchangeFuture();
+            topVer = assigns.topologyVersion();
+
+            this.grp = grp;
             this.log = log;
-            this.startedEvtSent = startedEvtSent;
-            this.stoppedEvtSent = stoppedEvtSent;
             this.updateSeq = updateSeq;
+
+            ctx= grp.shared();
         }
 
         /**
          * Dummy future. Will be done by real one.
          */
-        public RebalanceFuture() {
+        RebalanceFuture() {
             this.exchFut = null;
             this.topVer = null;
-            this.cctx = null;
+            this.ctx = null;
+            this.grp = null;
             this.log = null;
-            this.startedEvtSent = null;
-            this.stoppedEvtSent = null;
             this.updateSeq = -1;
         }
 
@@ -900,7 +892,7 @@ public class GridDhtPartitionDemander {
 
                 U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']');
 
-                if (!cctx.kernalContext().isStopping()) {
+                if (!ctx.kernalContext().isStopping()) {
                     for (UUID nodeId : remaining.keySet())
                         cleanupRemoteContexts(nodeId);
                 }
@@ -921,7 +913,7 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return;
 
-                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+                U.log(log, ("Cancelled rebalancing [cache=" + grp.cacheOrGroupName() +
                     ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
                     ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
 
@@ -956,22 +948,24 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          */
         private void cleanupRemoteContexts(UUID nodeId) {
-            ClusterNode node = cctx.discovery().node(nodeId);
+            ClusterNode node = ctx.discovery().node(nodeId);
 
             if (node == null)
                 return;
 
             GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
-                -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
+                -1/* remove supply context signal */,
+                this.topologyVersion(),
+                grp.groupId());
 
-            d.timeout(cctx.config().getRebalanceTimeout());
+            d.timeout(grp.config().getRebalanceTimeout());
 
             try {
-                for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
+                for (int idx = 0; idx < ctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
                     d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
-                    cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
-                        d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                    ctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                        d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
                 }
             }
             catch (IgniteCheckedException ignored) {
@@ -989,20 +983,19 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return;
 
-                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                        exchFut.discoveryEvent());
+                if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                    rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchFut.discoveryEvent());
 
                 T2<Long, Collection<Integer>> t = remaining.get(nodeId);
 
-                assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId +
+                assert t != null : "Remaining not found [grp=" + grp.name() + ", fromNode=" + nodeId +
                     ", part=" + p + "]";
 
                 Collection<Integer> parts = t.get2();
 
                 boolean rmvd = parts.remove(p);
 
-                assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId +
+                assert rmvd : "Partition already done [grp=" + grp.name() + ", fromNode=" + nodeId +
                     ", part=" + p + ", left=" + parts + "]";
 
                 if (parts.isEmpty()) {
@@ -1022,18 +1015,18 @@ public class GridDhtPartitionDemander {
          * @param type Type.
          * @param discoEvt Discovery event.
          */
-        private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+        private void rebalanceEvent(int part, int type, DiscoveryEvent discoEvt) {
             assert discoEvt != null;
 
-            cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+            grp.addRebalanceEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
         }
 
         /**
          * @param type Type.
          * @param discoEvt Discovery event.
          */
-        private void preloadEvent(int type, DiscoveryEvent discoEvt) {
-            preloadEvent(-1, type, discoEvt);
+        private void rebalanceEvent(int type, DiscoveryEvent discoEvt) {
+            rebalanceEvent(-1, type, discoEvt);
         }
 
         /**
@@ -1053,7 +1046,7 @@ public class GridDhtPartitionDemander {
                 if (log.isDebugEnabled())
                     log.debug("Completed rebalance future: " + this);
 
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
 
                 Collection<Integer> m = new HashSet<>();
 
@@ -1067,13 +1060,13 @@ public class GridDhtPartitionDemander {
 
                     onDone(false); //Finished but has missed partitions, will force dummy exchange
 
-                    cctx.shared().exchange().forceDummyExchange(true, exchFut);
+                    ctx.exchange().forceDummyExchange(true, exchFut);
 
                     return;
                 }
 
-                if (!cancelled && !cctx.preloader().syncFuture().isDone())
-                    ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+                if (!cancelled && !grp.preloader().syncFuture().isDone())
+                    ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
 
                 onDone(!cancelled);
             }
@@ -1083,24 +1076,16 @@ public class GridDhtPartitionDemander {
          *
          */
         private void sendRebalanceStartedEvent() {
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
-                (!cctx.isReplicated() || !startedEvtSent.get())) {
-                preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
-
-                startedEvtSent.set(true);
-            }
+            if (grp.eventRecordable(EVT_CACHE_REBALANCE_STARTED))
+                rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
         }
 
         /**
          *
          */
         private void sendRebalanceFinishedEvent() {
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
-                (!cctx.isReplicated() || !stoppedEvtSent.get())) {
-                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
-                stoppedEvtSent.set(true);
-            }
+            if (grp.eventRecordable(EVT_CACHE_REBALANCE_STOPPED))
+                rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index f7f0aff..afdeb8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -26,7 +27,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
@@ -47,7 +48,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  */
 class GridDhtPartitionSupplier {
     /** */
-    private final GridCacheContext<?, ?> cctx;
+    private final CacheGroupContext grp;
 
     /** */
     private final IgniteLogger log;
@@ -65,18 +66,18 @@ class GridDhtPartitionSupplier {
     private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>();
 
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      */
-    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
-        assert cctx != null;
+    GridDhtPartitionSupplier(CacheGroupContext grp) {
+        assert grp != null;
 
-        this.cctx = cctx;
+        this.grp = grp;
 
-        log = cctx.logger(getClass());
+        log = grp.shared().logger(getClass());
 
-        top = cctx.dht().topology();
+        top = grp.topology();
 
-        depEnabled = cctx.gridDeploy().enabled();
+        depEnabled = grp.shared().gridDeploy().enabled();
     }
 
     /**
@@ -171,7 +172,7 @@ class GridDhtPartitionSupplier {
         assert d != null;
         assert id != null;
 
-        AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion cutTop = grp.affinity().lastVersion();
         AffinityTopologyVersion demTop = d.topologyVersion();
 
         T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop);
@@ -197,9 +198,12 @@ class GridDhtPartitionSupplier {
                 ", from=" + id + ", idx=" + idx + "]");
 
         GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(
-            d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+            d.updateSequence(),
+            grp.groupId(),
+            d.topologyVersion(),
+            grp.deploymentEnabled());
 
-        ClusterNode node = cctx.discovery().node(id);
+        ClusterNode node = grp.shared().discovery().node(id);
 
         if (node == null)
             return; // Context will be cleaned at topology change.
@@ -225,7 +229,7 @@ class GridDhtPartitionSupplier {
 
             boolean newReq = true;
 
-            long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount();
+            long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount();
 
             if (sctx != null) {
                 phase = sctx.phase;
@@ -234,7 +238,7 @@ class GridDhtPartitionSupplier {
             }
             else {
                 if (log.isDebugEnabled())
-                    log.debug("Starting supplying rebalancing [cache=" + cctx.name() +
+                    log.debug("Starting supplying rebalancing [cache=" + grp.cacheOrGroupName() +
                         ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
                         ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
                         ", idx=" + idx + "]");
@@ -280,7 +284,7 @@ class GridDhtPartitionSupplier {
                         IgniteRebalanceIterator iter;
 
                         if (sctx == null || sctx.entryIt == null) {
-                            iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
+                            iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
 
                             if (!iter.historical())
                                 s.clean(part);
@@ -289,7 +293,9 @@ class GridDhtPartitionSupplier {
                             iter = (IgniteRebalanceIterator)sctx.entryIt;
 
                         while (iter.hasNext()) {
-                            if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
+                            List<ClusterNode> nodes = grp.affinity().cachedAffinity(d.topologyVersion()).get(part);
+
+                            if (!nodes.contains(node)) {
                                 // Demander no longer needs this partition,
                                 // so we send '-1' partition and move on.
                                 s.missed(part);
@@ -313,7 +319,7 @@ class GridDhtPartitionSupplier {
                                 break;
                             }
 
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                            if (s.messageSize() >= grp.config().getRebalanceBatchSize()) {
                                 if (++bCnt >= maxBatchesCnt) {
                                     saveSupplyContext(scId,
                                         phase,
@@ -335,9 +341,9 @@ class GridDhtPartitionSupplier {
                                         return;
 
                                     s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
-                                        cctx.cacheId(),
+                                        grp.groupId(),
                                         d.topologyVersion(),
-                                        cctx.deploymentEnabled());
+                                        grp.deploymentEnabled());
                                 }
                             }
 
@@ -349,9 +355,10 @@ class GridDhtPartitionSupplier {
                             info.expireTime(row.expireTime());
                             info.version(row.version());
                             info.value(row.value());
+                            info.cacheId(row.cacheId());
 
                             if (preloadPred == null || preloadPred.apply(info))
-                                s.addEntry0(part, info, cctx);
+                                s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext());
                             else {
                                 if (log.isDebugEnabled())
                                     log.debug("Rebalance predicate evaluated to false (will not send " +
@@ -400,7 +407,7 @@ class GridDhtPartitionSupplier {
             reply(node, d, s, scId);
 
             if (log.isDebugEnabled())
-                log.debug("Finished supplying rebalancing [cache=" + cctx.name() +
+                log.debug("Finished supplying rebalancing [cache=" + grp.cacheOrGroupName() +
                     ", fromNode=" + node.id() +
                     ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
                     ", idx=" + idx + "]");
@@ -427,16 +434,15 @@ class GridDhtPartitionSupplier {
         GridDhtPartitionSupplyMessage s,
         T3<UUID, Integer, AffinityTopologyVersion> scId)
         throws IgniteCheckedException {
-
         try {
             if (log.isDebugEnabled())
                 log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
 
-            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+            grp.shared().io().sendOrderedMessage(n, d.topic(), s, grp.ioPolicy(), d.timeout());
 
             // Throttle preloading.
-            if (cctx.config().getRebalanceThrottle() > 0)
-                U.sleep(cctx.config().getRebalanceThrottle());
+            if (grp.config().getRebalanceThrottle() > 0)
+                U.sleep(grp.config().getRebalanceThrottle());
 
             return true;
         }
@@ -469,7 +475,7 @@ class GridDhtPartitionSupplier {
         AffinityTopologyVersion topVer,
         long updateSeq) {
         synchronized (scMap) {
-            if (cctx.affinity().affinityTopologyVersion().equals(topVer)) {
+            if (grp.affinity().lastVersion().equals(topVer)) {
                 assert scMap.get(t) == null;
 
                 scMap.put(t,

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 903a7da..f8d4344 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -31,10 +31,11 @@ import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -45,7 +46,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Partition supply message.
  */
-public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -79,18 +80,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
 
     /**
      * @param updateSeq Update sequence for this node.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param topVer Topology version.
      * @param addDepInfo Deployment info flag.
      */
     GridDhtPartitionSupplyMessage(long updateSeq,
-        int cacheId,
+        int grpId,
         AffinityTopologyVersion topVer,
         boolean addDepInfo) {
-        this.cacheId = cacheId;
+        this.grpId = grpId;
         this.updateSeq = updateSeq;
         this.topVer = topVer;
-        this.addDepInfo = addDepInfo;    }
+        this.addDepInfo = addDepInfo;
+    }
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -203,18 +205,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
     /**
      * @param p Partition.
      * @param info Entry to add.
-     * @param ctx Cache context.
+     * @param ctx Cache shared context.
+     * @param cacheObjCtx Cache object context.
      * @throws IgniteCheckedException If failed.
      */
-    void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+    void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
         assert info != null;
         assert info.key() != null : info;
         assert info.value() != null : info;
 
         // Need to call this method to initialize info properly.
-        marshalInfo(info, ctx);
+        marshalInfo(info, ctx, cacheObjCtx);
 
-        msgSize += info.marshalledSize(ctx);
+        msgSize += info.marshalledSize(cacheObjCtx);
 
         CacheEntryInfoCollection infoCol = infos().get(p);
 
@@ -234,13 +237,13 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
+        CacheGroupContext grp = ctx.cache().cacheGroup(grpId);
 
         for (CacheEntryInfoCollection col : infos().values()) {
             List<GridCacheEntryInfo> entries = col.infos();
 
             for (int i = 0; i < entries.size(); i++)
-                entries.get(i).unmarshal(cacheCtx, ldr);
+                entries.get(i).unmarshal(grp.cacheObjectContext(), ldr);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 74bbcb0..441952d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -65,6 +65,11 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public int partition() {
         return GridIoMessage.STRIPE_DISABLED_PART;
     }
@@ -87,10 +92,10 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Parition update counters.
      */
-    public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId);
+    public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId);
 
     /**
      * @return Last used version among all nodes.
@@ -114,6 +119,11 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
     }
 
     /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
@@ -128,19 +138,19 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
         }
 
         switch (writer.state()) {
-            case 3:
+            case 2:
                 if (!writer.writeMessage("exchId", exchId))
                     return false;
 
                 writer.incrementState();
 
-            case 4:
+            case 3:
                 if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
-            case 5:
+            case 4:
                 if (!writer.writeMessage("lastVer", lastVer))
                     return false;
 
@@ -162,7 +172,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
             return false;
 
         switch (reader.state()) {
-            case 3:
+            case 2:
                 exchId = reader.readMessage("exchId");
 
                 if (!reader.isLastRead())
@@ -170,7 +180,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
 
                 reader.incrementState();
 
-            case 4:
+            case 3:
                 flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
@@ -178,7 +188,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
 
                 reader.incrementState();
 
-            case 5:
+            case 4:
                 lastVer = reader.readMessage("lastVer");
 
                 if (!reader.isLastRead())


Mime
View raw message