ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/18] ignite git commit: ignite-5075 'logical' caches sharing the same 'physical' cache group
Date Sun, 04 Jun 2017 08:03:07 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index c4c57a7..977e9ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -43,7 +43,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Force keys response. Contains absent keys.
  */
-public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtForceKeysResponse extends GridCacheIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -168,7 +168,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
 
         if (infos != null) {
             for (GridCacheEntryInfo info : infos)
-                info.marshal(cctx);
+                info.marshal(cctx.cacheObjectContext());
         }
 
         if (err != null && errBytes == null)
@@ -186,7 +186,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
 
         if (infos != null) {
             for (GridCacheEntryInfo info : infos)
-                info.unmarshal(cctx, ldr);
+                info.unmarshal(cctx.cacheObjectContext(), ldr);
         }
 
         if (errBytes != null && err == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index ef6a3b9..4a693bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -39,7 +39,7 @@ import org.jetbrains.annotations.NotNull;
 /**
  * Partition demand request.
  */
-public class GridDhtPartitionDemandMessage extends GridCacheMessage {
+public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -77,10 +77,10 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
     /**
      * @param updateSeq Update sequence for this node.
      * @param topVer Topology version.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      */
-    GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) {
-        this.cacheId = cacheId;
+    GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int grpId) {
+        this.grpId = grpId;
         this.updateSeq = updateSeq;
         this.topVer = topVer;
     }
@@ -91,7 +91,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
      */
     GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts,
         Map<Integer, Long> partsCntrs) {
-        cacheId = cp.cacheId;
+        grpId = cp.grpId;
         updateSeq = cp.updateSeq;
         topic = cp.topic;
         timeout = cp.timeout;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 3c04617..cda24e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -38,13 +37,17 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -78,7 +81,10 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
 @SuppressWarnings("NonConstantFieldWithUpperCaseName")
 public class GridDhtPartitionDemander {
     /** */
-    private final GridCacheContext<?, ?> cctx;
+    private final GridCacheSharedContext<?, ?> ctx;
+
+    /** */
+    private final CacheGroupContext grp;
 
     /** */
     private final IgniteLogger log;
@@ -104,30 +110,20 @@ public class GridDhtPartitionDemander {
     private final Map<Integer, Object> rebalanceTopics;
 
     /**
-     * Started event sent.
-     * Make sense for replicated cache only.
+     * @param grp Ccahe group.
      */
-    private final AtomicBoolean startedEvtSent = new AtomicBoolean();
+    public GridDhtPartitionDemander(CacheGroupContext grp) {
+        assert grp != null;
 
-    /**
-     * Stopped event sent.
-     * Make sense for replicated cache only.
-     */
-    private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();
+        this.grp = grp;
 
-    /**
-     * @param cctx Cctx.
-     */
-    public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx) {
-        assert cctx != null;
-
-        this.cctx = cctx;
+        ctx = grp.shared();
 
-        log = cctx.logger(getClass());
+        log = ctx.logger(getClass());
 
-        boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
+        boolean enabled = grp.rebalanceEnabled() && !ctx.kernalContext().clientNode();
 
-        rebalanceFut = new RebalanceFuture();//Dummy.
+        rebalanceFut = new RebalanceFuture(); //Dummy.
 
         if (!enabled) {
             // Calling onDone() immediately since preloading is disabled.
@@ -137,7 +133,7 @@ public class GridDhtPartitionDemander {
 
         Map<Integer, Object> tops = new HashMap<>();
 
-        for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++)
+        for (int idx = 0; idx < grp.shared().kernalContext().config().getRebalanceThreadPoolSize(); idx++)
             tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
         rebalanceTopics = tops;
@@ -196,7 +192,7 @@ public class GridDhtPartitionDemander {
         GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
 
         if (obj != null)
-            cctx.time().removeTimeoutObject(obj);
+            ctx.time().removeTimeoutObject(obj);
 
         final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
 
@@ -208,7 +204,7 @@ public class GridDhtPartitionDemander {
 
             exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                 @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
-                    IgniteInternalFuture<Boolean> fut0 = cctx.shared().exchange().forceRebalance(exchFut);
+                    IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut);
 
                     fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
                         @Override public void apply(IgniteInternalFuture<Boolean> future) {
@@ -237,7 +233,7 @@ public class GridDhtPartitionDemander {
      */
     private boolean topologyChanged(RebalanceFuture fut) {
         return
-            !cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
+            !grp.affinity().lastVersion().equals(fut.topologyVersion()) || // Topology already changed.
                 fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
     }
 
@@ -268,12 +264,12 @@ public class GridDhtPartitionDemander {
 
         assert force == (forcedRebFut != null);
 
-        long delay = cctx.config().getRebalanceDelay();
+        long delay = grp.config().getRebalanceDelay();
 
         if (delay == 0 || force) {
             final RebalanceFuture oldFut = rebalanceFut;
 
-            final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt);
+            final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, cnt);
 
             if (!oldFut.isInitial())
                 oldFut.cancel();
@@ -302,16 +298,18 @@ public class GridDhtPartitionDemander {
 
             fut.sendRebalanceStartedEvent();
 
-            final boolean statsEnabled = cctx.config().isStatisticsEnabled();
+            for (GridCacheContext cctx : grp.caches()) {
+                if (cctx.config().isStatisticsEnabled()) {
+                    final CacheMetricsImpl metrics = cctx.cache().metrics0();
 
-            if (statsEnabled) {
-                cctx.cache().metrics0().clearRebalanceCounters();
+                    metrics.clearRebalanceCounters();
 
-                rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
-                    @Override public void apply(IgniteInternalFuture<Boolean> fut) {
-                        cctx.cache().metrics0().clearRebalanceCounters();
-                    }
-                });
+                    rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                        @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                            metrics.clearRebalanceCounters();
+                        }
+                    });
+                }
             }
 
             if (assigns.cancelled()) { // Pending exchange.
@@ -331,7 +329,7 @@ public class GridDhtPartitionDemander {
 
                 fut.onDone(true);
 
-                ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+                ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
 
                 fut.sendRebalanceFinishedEvent();
 
@@ -362,7 +360,7 @@ public class GridDhtPartitionDemander {
             GridTimeoutObject obj = lastTimeoutObj.get();
 
             if (obj != null)
-                cctx.time().removeTimeoutObject(obj);
+                ctx.time().removeTimeoutObject(obj);
 
             final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
 
@@ -372,7 +370,7 @@ public class GridDhtPartitionDemander {
                 @Override public void onTimeout() {
                     exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
-                            cctx.shared().exchange().forceRebalance(exchFut);
+                            ctx.exchange().forceRebalance(exchFut);
                         }
                     });
                 }
@@ -380,7 +378,7 @@ public class GridDhtPartitionDemander {
 
             lastTimeoutObj.set(obj);
 
-            cctx.time().addTimeoutObject(obj);
+            ctx.time().addTimeoutObject(obj);
         }
 
         return null;
@@ -389,7 +387,6 @@ public class GridDhtPartitionDemander {
     /**
      * @param fut Rebalance future.
      * @param assigns Assignments.
-     * @throws IgniteCheckedException If failed.
      */
     private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssignments assigns) {
         assert fut != null;
@@ -411,17 +408,19 @@ public class GridDhtPartitionDemander {
 
                 Collection<Integer> parts= e.getValue().partitions();
 
-                assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]";
+                assert parts != null : "Partitions are null [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId + "]";
 
                 fut.remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
             }
         }
 
+        final CacheConfiguration cfg = grp.config();
+
+        int lsnrCnt = ctx.gridConfig().getRebalanceThreadPoolSize();
+
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
             final ClusterNode node = e.getKey();
 
-            final CacheConfiguration cfg = cctx.config();
-
             final Collection<Integer> parts = fut.remaining.get(node.id()).get2();
 
             GridDhtPartitionDemandMessage d = e.getValue();
@@ -430,8 +429,6 @@ public class GridDhtPartitionDemander {
                 ", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
                 ", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
 
-            int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
-
             final List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
 
             for (int cnt = 0; cnt < lsnrCnt; cnt++)
@@ -451,16 +448,15 @@ public class GridDhtPartitionDemander {
 
                     initD.topic(rebalanceTopics.get(cnt));
                     initD.updateSequence(fut.updateSeq);
-                    initD.timeout(cctx.config().getRebalanceTimeout());
+                    initD.timeout(cfg.getRebalanceTimeout());
 
                     final int finalCnt = cnt;
 
-                    cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                    ctx.kernalContext().closure().runLocalSafe(new Runnable() {
                         @Override public void run() {
                             try {
                                 if (!fut.isDone()) {
-                                    cctx.io().sendOrderedMessage(node,
-                                        rebalanceTopics.get(finalCnt), initD, cctx.ioPolicy(), initD.timeout());
+                                    ctx.io().sendOrderedMessage(node, rebalanceTopics.get(finalCnt), initD, grp.ioPolicy(), initD.timeout());
 
                                     // Cleanup required in case partitions demanded in parallel with cancellation.
                                     synchronized (fut) {
@@ -507,11 +503,11 @@ public class GridDhtPartitionDemander {
 
         for (Integer part : parts) {
             try {
-                if (cctx.shared().database().persistenceEnabled()) {
+                if (ctx.database().persistenceEnabled()) {
                     if (partCntrs == null)
                         partCntrs = new HashMap<>(parts.size(), 1.0f);
 
-                    GridDhtLocalPartition p = cctx.topology().localPartition(part, old.topologyVersion(), false);
+                    GridDhtLocalPartition p = grp.topology().localPartition(part, old.topologyVersion(), false);
 
                     partCntrs.put(part, p.initialUpdateCounter());
                 }
@@ -587,7 +583,7 @@ public class GridDhtPartitionDemander {
 
         final RebalanceFuture fut = rebalanceFut;
 
-        ClusterNode node = cctx.node(id);
+        ClusterNode node = ctx.node(id);
 
         if (node == null)
             return;
@@ -611,23 +607,42 @@ public class GridDhtPartitionDemander {
             return;
         }
 
-        final GridDhtPartitionTopology top = cctx.dht().topology();
+        final GridDhtPartitionTopology top = grp.topology();
+
+        if (grp.sharedGroup()) {
+            for (GridCacheContext cctx : grp.caches()) {
+                if (cctx.config().isStatisticsEnabled()) {
+                    long keysCnt = supply.keysForCache(cctx.cacheId());
 
-        final boolean statsEnabled = cctx.config().isStatisticsEnabled();
+                    if (keysCnt != -1)
+                        cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
 
-        if (statsEnabled) {
-            if (supply.estimatedKeysCount() != -1)
-                cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount());
+                    // Can not be calculated per cache.
+                    cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize());
+                }
+            }
+        }
+        else {
+            GridCacheContext cctx = grp.singleCacheContext();
 
-            cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize());
+            if (cctx.config().isStatisticsEnabled()) {
+                if (supply.estimatedKeysCount() != -1)
+                    cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount());
+
+                cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize());
+            }
         }
 
         try {
+            AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
+
+            GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext();
+
             // Preload.
             for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
                 int p = e.getKey();
 
-                if (cctx.affinity().partitionLocalNode(p, topVer)) {
+                if (aff.get(p).contains(ctx.localNode())) {
                     GridDhtLocalPartition part = top.localPartition(p, topVer, true);
 
                     assert part != null;
@@ -638,7 +653,7 @@ public class GridDhtPartitionDemander {
                         boolean reserved = part.reserve();
 
                         assert reserved : "Failed to reserve partition [igniteInstanceName=" +
-                            cctx.igniteInstanceName() + ", cacheName=" + cctx.name() + ", part=" + part + ']';
+                            ctx.igniteInstanceName() + ", grp=" + grp.cacheOrGroupName() + ", part=" + part + ']';
 
                         part.lock();
 
@@ -662,7 +677,10 @@ public class GridDhtPartitionDemander {
                                     break;
                                 }
 
-                                if (statsEnabled)
+                                if (grp.sharedGroup() && (cctx == null || cctx.cacheId() != entry.cacheId()))
+                                    cctx = ctx.cacheContext(entry.cacheId());
+
+                                if(cctx != null && cctx.config().isStatisticsEnabled())
                                     cctx.cache().metrics0().onRebalanceKeyReceived();
                             }
 
@@ -700,7 +718,7 @@ public class GridDhtPartitionDemander {
 
             // Only request partitions based on latest topology version.
             for (Integer miss : supply.missed()) {
-                if (cctx.affinity().partitionLocalNode(miss, topVer))
+                if (aff.get(miss).contains(ctx.localNode()))
                     fut.partitionMissed(id, miss);
             }
 
@@ -708,16 +726,18 @@ public class GridDhtPartitionDemander {
                 fut.partitionDone(id, miss);
 
             GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
-                supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
+                supply.updateSequence(),
+                supply.topologyVersion(),
+                grp.groupId());
 
-            d.timeout(cctx.config().getRebalanceTimeout());
+            d.timeout(grp.config().getRebalanceTimeout());
 
             d.topic(rebalanceTopics.get(idx));
 
             if (!topologyChanged(fut) && !fut.isDone()) {
                 // Send demand message.
-                cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
-                    d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                ctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
+                    d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
             }
         }
         catch (IgniteCheckedException e) {
@@ -746,11 +766,15 @@ public class GridDhtPartitionDemander {
         GridCacheEntryInfo entry,
         AffinityTopologyVersion topVer
     ) throws IgniteCheckedException {
+        ctx.database().checkpointReadLock();
+
         try {
             GridCacheEntryEx cached = null;
 
             try {
-                cached = cctx.dht().entryEx(entry.key());
+                GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(entry.cacheId()) : grp.singleCacheContext();
+
+                cached = cctx.dhtCache().entryEx(entry.key());
 
                 if (log.isDebugEnabled())
                     log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
@@ -808,7 +832,10 @@ public class GridDhtPartitionDemander {
         }
         catch (IgniteCheckedException e) {
             throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
-                cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+                ctx.localNode() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
+        }
+        finally {
+            ctx.database().checkpointReadUnlock();
         }
 
         return true;
@@ -824,16 +851,10 @@ public class GridDhtPartitionDemander {
      */
     public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
         /** */
-        private static final long serialVersionUID = 1L;
-
-        /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */
-        private final AtomicBoolean startedEvtSent;
-
-        /** Should EVT_CACHE_REBALANCE_STOPPED event be sent or not. */
-        private final AtomicBoolean stoppedEvtSent;
+        private final GridCacheSharedContext<?, ?> ctx;
 
         /** */
-        private final GridCacheContext<?, ?> cctx;
+        private final CacheGroupContext grp;
 
         /** */
         private final IgniteLogger log;
@@ -855,42 +876,38 @@ public class GridDhtPartitionDemander {
         private final long updateSeq;
 
         /**
+         * @param grp Cache group.
          * @param assigns Assigns.
-         * @param cctx Context.
          * @param log Logger.
-         * @param startedEvtSent Start event sent flag.
-         * @param stoppedEvtSent Stop event sent flag.
          * @param updateSeq Update sequence.
          */
-        RebalanceFuture(GridDhtPreloaderAssignments assigns,
-            GridCacheContext<?, ?> cctx,
+        RebalanceFuture(
+            CacheGroupContext grp,
+            GridDhtPreloaderAssignments assigns,
             IgniteLogger log,
-            AtomicBoolean startedEvtSent,
-            AtomicBoolean stoppedEvtSent,
             long updateSeq) {
             assert assigns != null;
 
             exchFut = assigns.exchangeFuture();
             topVer = assigns.topologyVersion();
 
-            this.cctx = cctx;
+            this.grp = grp;
             this.log = log;
-            this.startedEvtSent = startedEvtSent;
-            this.stoppedEvtSent = stoppedEvtSent;
             this.updateSeq = updateSeq;
+
+            ctx= grp.shared();
         }
 
         /**
          * Dummy future. Will be done by real one.
          */
-        public RebalanceFuture() {
-            exchFut = null;
-            topVer = null;
-            cctx = null;
-            log = null;
-            startedEvtSent = null;
-            stoppedEvtSent = null;
-            updateSeq = -1;
+        RebalanceFuture() {
+            this.exchFut = null;
+            this.topVer = null;
+            this.ctx = null;
+            this.grp = null;
+            this.log = null;
+            this.updateSeq = -1;
         }
 
         /**
@@ -927,7 +944,7 @@ public class GridDhtPartitionDemander {
 
                 U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']');
 
-                if (!cctx.kernalContext().isStopping()) {
+                if (!ctx.kernalContext().isStopping()) {
                     for (UUID nodeId : remaining.keySet())
                         cleanupRemoteContexts(nodeId);
                 }
@@ -948,7 +965,7 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return;
 
-                U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
+                U.log(log, ("Cancelled rebalancing [cache=" + grp.cacheOrGroupName() +
                     ", fromNode=" + nodeId + ", topology=" + topologyVersion() +
                     ", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
 
@@ -983,22 +1000,24 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          */
         private void cleanupRemoteContexts(UUID nodeId) {
-            ClusterNode node = cctx.discovery().node(nodeId);
+            ClusterNode node = ctx.discovery().node(nodeId);
 
             if (node == null)
                 return;
 
             GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
-                -1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
+                -1/* remove supply context signal */,
+                this.topologyVersion(),
+                grp.groupId());
 
-            d.timeout(cctx.config().getRebalanceTimeout());
+            d.timeout(grp.config().getRebalanceTimeout());
 
             try {
-                for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
+                for (int idx = 0; idx < ctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
                     d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
 
-                    cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
-                        d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
+                    ctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
+                        d, grp.ioPolicy(), grp.config().getRebalanceTimeout());
                 }
             }
             catch (IgniteCheckedException ignored) {
@@ -1016,20 +1035,19 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return;
 
-                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-                        exchFut.discoveryEvent());
+                if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                    rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchFut.discoveryEvent());
 
                 T2<Long, Collection<Integer>> t = remaining.get(nodeId);
 
-                assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId +
+                assert t != null : "Remaining not found [grp=" + grp.name() + ", fromNode=" + nodeId +
                     ", part=" + p + "]";
 
                 Collection<Integer> parts = t.get2();
 
                 boolean rmvd = parts.remove(p);
 
-                assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId +
+                assert rmvd : "Partition already done [grp=" + grp.name() + ", fromNode=" + nodeId +
                     ", part=" + p + ", left=" + parts + "]";
 
                 if (parts.isEmpty()) {
@@ -1049,18 +1067,18 @@ public class GridDhtPartitionDemander {
          * @param type Type.
          * @param discoEvt Discovery event.
          */
-        private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+        private void rebalanceEvent(int part, int type, DiscoveryEvent discoEvt) {
             assert discoEvt != null;
 
-            cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+            grp.addRebalanceEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
         }
 
         /**
          * @param type Type.
          * @param discoEvt Discovery event.
          */
-        private void preloadEvent(int type, DiscoveryEvent discoEvt) {
-            preloadEvent(-1, type, discoEvt);
+        private void rebalanceEvent(int type, DiscoveryEvent discoEvt) {
+            rebalanceEvent(-1, type, discoEvt);
         }
 
         /**
@@ -1080,7 +1098,7 @@ public class GridDhtPartitionDemander {
                 if (log.isDebugEnabled())
                     log.debug("Completed rebalance future: " + this);
 
-                cctx.shared().exchange().scheduleResendPartitions();
+                ctx.exchange().scheduleResendPartitions();
 
                 Collection<Integer> m = new HashSet<>();
 
@@ -1094,13 +1112,13 @@ public class GridDhtPartitionDemander {
 
                     onDone(false); //Finished but has missed partitions, will force dummy exchange
 
-                    cctx.shared().exchange().forceDummyExchange(true, exchFut);
+                    ctx.exchange().forceDummyExchange(true, exchFut);
 
                     return;
                 }
 
-                if (!cancelled && !cctx.preloader().syncFuture().isDone())
-                    ((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
+                if (!cancelled && !grp.preloader().syncFuture().isDone())
+                    ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
 
                 onDone(!cancelled);
             }
@@ -1110,24 +1128,16 @@ public class GridDhtPartitionDemander {
          *
          */
         private void sendRebalanceStartedEvent() {
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
-                (!cctx.isReplicated() || !startedEvtSent.get())) {
-                preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
-
-                startedEvtSent.set(true);
-            }
+            if (grp.eventRecordable(EVT_CACHE_REBALANCE_STARTED))
+                rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
         }
 
         /**
          *
          */
         private void sendRebalanceFinishedEvent() {
-            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
-                (!cctx.isReplicated() || !stoppedEvtSent.get())) {
-                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-
-                stoppedEvtSent.set(true);
-            }
+            if (grp.eventRecordable(EVT_CACHE_REBALANCE_STOPPED))
+                rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 0ff03f7..467b906 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -26,7 +27,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
@@ -47,7 +48,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  */
 class GridDhtPartitionSupplier {
     /** */
-    private final GridCacheContext<?, ?> cctx;
+    private final CacheGroupContext grp;
 
     /** */
     private final IgniteLogger log;
@@ -65,18 +66,18 @@ class GridDhtPartitionSupplier {
     private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>();
 
     /**
-     * @param cctx Cache context.
+     * @param grp Cache group.
      */
-    GridDhtPartitionSupplier(GridCacheContext<?, ?> cctx) {
-        assert cctx != null;
+    GridDhtPartitionSupplier(CacheGroupContext grp) {
+        assert grp != null;
 
-        this.cctx = cctx;
+        this.grp = grp;
 
-        log = cctx.logger(getClass());
+        log = grp.shared().logger(getClass());
 
-        top = cctx.dht().topology();
+        top = grp.topology();
 
-        depEnabled = cctx.gridDeploy().enabled();
+        depEnabled = grp.shared().gridDeploy().enabled();
     }
 
     /**
@@ -171,7 +172,7 @@ class GridDhtPartitionSupplier {
         assert d != null;
         assert id != null;
 
-        AffinityTopologyVersion cutTop = cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion cutTop = grp.affinity().lastVersion();
         AffinityTopologyVersion demTop = d.topologyVersion();
 
         T3<UUID, Integer, AffinityTopologyVersion> scId = new T3<>(id, idx, demTop);
@@ -197,9 +198,12 @@ class GridDhtPartitionSupplier {
                 ", from=" + id + ", idx=" + idx + "]");
 
         GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(
-            d.updateSequence(), cctx.cacheId(), d.topologyVersion(), cctx.deploymentEnabled());
+            d.updateSequence(),
+            grp.groupId(),
+            d.topologyVersion(),
+            grp.deploymentEnabled());
 
-        ClusterNode node = cctx.discovery().node(id);
+        ClusterNode node = grp.shared().discovery().node(id);
 
         if (node == null)
             return; // Context will be cleaned at topology change.
@@ -225,7 +229,7 @@ class GridDhtPartitionSupplier {
 
             boolean newReq = true;
 
-            long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount();
+            long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount();
 
             if (sctx != null) {
                 phase = sctx.phase;
@@ -234,7 +238,7 @@ class GridDhtPartitionSupplier {
             }
             else {
                 if (log.isDebugEnabled())
-                    log.debug("Starting supplying rebalancing [cache=" + cctx.name() +
+                    log.debug("Starting supplying rebalancing [cache=" + grp.cacheOrGroupName() +
                         ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() +
                         ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
                         ", idx=" + idx + "]");
@@ -243,18 +247,19 @@ class GridDhtPartitionSupplier {
             Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
 
             if (sctx == null) {
-                long keysCnt = 0;
-
                 for (Integer part : d.partitions()) {
                     GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false);
 
                     if (loc == null || loc.state() != OWNING)
                         continue;
 
-                    keysCnt += cctx.offheap().entriesCount(part);
+                    if (grp.sharedGroup()) {
+                        for (int cacheId : grp.cacheIds())
+                            s.addKeysForCache(cacheId, grp.offheap().cacheEntriesCount(cacheId, part));
+                    }
+                    else
+                        s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part));
                 }
-
-                s.estimatedKeysCount(keysCnt);
             }
 
             while ((sctx != null && newReq) || partIt.hasNext()) {
@@ -295,22 +300,24 @@ class GridDhtPartitionSupplier {
                         IgniteRebalanceIterator iter;
 
                         if (sctx == null || sctx.entryIt == null) {
-                            iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(),
+                            iter = grp.offheap().rebalanceIterator(part, d.topologyVersion(),
                                 d.isHistorical(part) ? d.partitionCounter(part) : null);
 
                             if (!iter.historical()) {
-                                assert !cctx.shared().database().persistenceEnabled() || !d.isHistorical(part);
+                                assert !grp.shared().database().persistenceEnabled() || !d.isHistorical(part);
 
                                 s.clean(part);
                             }
                             else
-                                assert cctx.shared().database().persistenceEnabled() && d.isHistorical(part);
+                                assert grp.shared().database().persistenceEnabled() && d.isHistorical(part);
                         }
                         else
                             iter = (IgniteRebalanceIterator)sctx.entryIt;
 
                         while (iter.hasNext()) {
-                            if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
+                            List<ClusterNode> nodes = grp.affinity().cachedAffinity(d.topologyVersion()).get(part);
+
+                            if (!nodes.contains(node)) {
                                 // Demander no longer needs this partition,
                                 // so we send '-1' partition and move on.
                                 s.missed(part);
@@ -334,7 +341,7 @@ class GridDhtPartitionSupplier {
                                 break;
                             }
 
-                            if (s.messageSize() >= cctx.config().getRebalanceBatchSize()) {
+                            if (s.messageSize() >= grp.config().getRebalanceBatchSize()) {
                                 if (++bCnt >= maxBatchesCnt) {
                                     saveSupplyContext(scId,
                                         phase,
@@ -356,9 +363,9 @@ class GridDhtPartitionSupplier {
                                         return;
 
                                     s = new GridDhtPartitionSupplyMessage(d.updateSequence(),
-                                        cctx.cacheId(),
+                                        grp.groupId(),
                                         d.topologyVersion(),
-                                        cctx.deploymentEnabled());
+                                        grp.deploymentEnabled());
                                 }
                             }
 
@@ -370,9 +377,10 @@ class GridDhtPartitionSupplier {
                             info.expireTime(row.expireTime());
                             info.version(row.version());
                             info.value(row.value());
+                            info.cacheId(row.cacheId());
 
                             if (preloadPred == null || preloadPred.apply(info))
-                                s.addEntry0(part, info, cctx);
+                                s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext());
                             else {
                                 if (log.isDebugEnabled())
                                     log.debug("Rebalance predicate evaluated to false (will not send " +
@@ -421,7 +429,7 @@ class GridDhtPartitionSupplier {
             reply(node, d, s, scId);
 
             if (log.isDebugEnabled())
-                log.debug("Finished supplying rebalancing [cache=" + cctx.name() +
+                log.debug("Finished supplying rebalancing [cache=" + grp.cacheOrGroupName() +
                     ", fromNode=" + node.id() +
                     ", topology=" + d.topologyVersion() + ", updateSeq=" + d.updateSequence() +
                     ", idx=" + idx + "]");
@@ -448,16 +456,15 @@ class GridDhtPartitionSupplier {
         GridDhtPartitionSupplyMessage s,
         T3<UUID, Integer, AffinityTopologyVersion> scId)
         throws IgniteCheckedException {
-
         try {
             if (log.isDebugEnabled())
                 log.debug("Replying to partition demand [node=" + n.id() + ", demand=" + d + ", supply=" + s + ']');
 
-            cctx.io().sendOrderedMessage(n, d.topic(), s, cctx.ioPolicy(), d.timeout());
+            grp.shared().io().sendOrderedMessage(n, d.topic(), s, grp.ioPolicy(), d.timeout());
 
             // Throttle preloading.
-            if (cctx.config().getRebalanceThrottle() > 0)
-                U.sleep(cctx.config().getRebalanceThrottle());
+            if (grp.config().getRebalanceThrottle() > 0)
+                U.sleep(grp.config().getRebalanceThrottle());
 
             return true;
         }
@@ -490,7 +497,7 @@ class GridDhtPartitionSupplier {
         AffinityTopologyVersion topVer,
         long updateSeq) {
         synchronized (scMap) {
-            if (cctx.affinity().affinityTopologyVersion().equals(topVer)) {
+            if (grp.affinity().lastVersion().equals(topVer)) {
                 assert scMap.get(t) == null;
 
                 scMap.put(t,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 1cb32e3..ef14a90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -28,13 +28,13 @@ import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -45,7 +45,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Partition supply message.
  */
-public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements GridCacheDeployable {
+public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -79,17 +79,21 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
     /** Estimated keys count. */
     private long estimatedKeysCnt = -1;
 
+    /** Estimated keys count per cache in case the message is for shared group. */
+    @GridDirectMap(keyType = int.class, valueType = long.class)
+    private Map<Integer, Long> keysPerCache;
+
     /**
      * @param updateSeq Update sequence for this node.
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @param topVer Topology version.
      * @param addDepInfo Deployment info flag.
      */
     GridDhtPartitionSupplyMessage(long updateSeq,
-        int cacheId,
+        int grpId,
         AffinityTopologyVersion topVer,
         boolean addDepInfo) {
-        this.cacheId = cacheId;
+        this.grpId = grpId;
         this.updateSeq = updateSeq;
         this.topVer = topVer;
         this.addDepInfo = addDepInfo;
@@ -206,18 +210,19 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
     /**
      * @param p Partition.
      * @param info Entry to add.
-     * @param ctx Cache context.
+     * @param ctx Cache shared context.
+     * @param cacheObjCtx Cache object context.
      * @throws IgniteCheckedException If failed.
      */
-    void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException {
+    void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
         assert info != null;
         assert info.key() != null : info;
         assert info.value() != null : info;
 
         // Need to call this method to initialize info properly.
-        marshalInfo(info, ctx);
+        marshalInfo(info, ctx, cacheObjCtx);
 
-        msgSize += info.marshalledSize(ctx);
+        msgSize += info.marshalledSize(cacheObjCtx);
 
         CacheEntryInfoCollection infoCol = infos().get(p);
 
@@ -237,13 +242,13 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        GridCacheContext cacheCtx = ctx.cacheContext(cacheId);
+        CacheGroupContext grp = ctx.cache().cacheGroup(grpId);
 
         for (CacheEntryInfoCollection col : infos().values()) {
             List<GridCacheEntryInfo> entries = col.infos();
 
             for (int i = 0; i < entries.size(); i++)
-                entries.get(i).unmarshal(cacheCtx, ldr);
+                entries.get(i).unmarshal(grp.cacheObjectContext(), ldr);
         }
     }
 
@@ -281,46 +286,53 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+                if (!writer.writeLong("estimatedKeysCnt", estimatedKeysCnt))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+                if (!writer.writeMap("infos", infos, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+                if (!writer.writeMap("keysPerCache", keysPerCache, MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeLong("updateSeq", updateSeq))
+                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeLong("estimatedKeysCnt", estimatedKeysCnt))
+                if (!writer.writeInt("msgSize", msgSize))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeInt("msgSize", msgSize))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
+
+            case 11:
+                if (!writer.writeLong("updateSeq", updateSeq))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -346,7 +358,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 reader.incrementState();
 
             case 4:
-                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+                estimatedKeysCnt = reader.readLong("estimatedKeysCnt");
 
                 if (!reader.isLastRead())
                     return false;
@@ -354,7 +366,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 reader.incrementState();
 
             case 5:
-                last = reader.readCollection("last", MessageCollectionItemType.INT);
+                infos = reader.readMap("infos", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -362,7 +374,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 reader.incrementState();
 
             case 6:
-                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+                keysPerCache = reader.readMap("keysPerCache", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -370,7 +382,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 reader.incrementState();
 
             case 7:
-                topVer = reader.readMessage("topVer");
+                last = reader.readCollection("last", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -378,7 +390,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 reader.incrementState();
 
             case 8:
-                updateSeq = reader.readLong("updateSeq");
+                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -386,7 +398,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 reader.incrementState();
 
             case 9:
-                estimatedKeysCnt = reader.readLong("estimatedKeysCnt");
+                msgSize = reader.readInt("msgSize");
 
                 if (!reader.isLastRead())
                     return false;
@@ -394,12 +406,21 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
                 reader.incrementState();
 
             case 10:
-                msgSize = reader.readInt("msgSize");
+                topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
                     return false;
 
                 reader.incrementState();
+
+            case 11:
+                updateSeq = reader.readLong("updateSeq");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtPartitionSupplyMessage.class);
@@ -412,7 +433,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 12;
     }
 
     /**
@@ -423,10 +444,43 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G
     }
 
     /**
-     * @param estimatedKeysCnt New estimated keys count.
+     * @param cnt Keys count to add.
      */
-    public void estimatedKeysCount(long estimatedKeysCnt) {
-        this.estimatedKeysCnt = estimatedKeysCnt;
+    public void addEstimatedKeysCount(long cnt) {
+        this.estimatedKeysCnt += cnt;
+    }
+
+    /**
+     * @return Estimated keys count for a given cache ID.
+     */
+    public long keysForCache(int cacheId) {
+        if (this.keysPerCache == null)
+            return -1;
+
+        Long cnt = this.keysPerCache.get(cacheId);
+
+        return cnt != null ? cnt : 0;
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param cnt Keys count.
+     */
+    public void addKeysForCache(int cacheId, long cnt) {
+        assert cacheId != 0 && cnt >= 0;
+
+        if (keysPerCache == null)
+            keysPerCache = new HashMap<>();
+
+        Long cnt0 = keysPerCache.get(cacheId);
+
+        if (cnt0 == null) {
+            keysPerCache.put(cacheId, cnt);
+
+            msgSize += 12;
+        }
+        else
+            keysPerCache.put(cacheId, cnt0 + cnt);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e45010b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 74bbcb0..441952d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -65,6 +65,11 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean cacheGroupMessage() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public int partition() {
         return GridIoMessage.STRIPE_DISABLED_PART;
     }
@@ -87,10 +92,10 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
     }
 
     /**
-     * @param cacheId Cache ID.
+     * @param grpId Cache group ID.
      * @return Parition update counters.
      */
-    public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int cacheId);
+    public abstract Map<Integer, T2<Long, Long>> partitionUpdateCounters(int grpId);
 
     /**
      * @return Last used version among all nodes.
@@ -114,6 +119,11 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
     }
 
     /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
@@ -128,19 +138,19 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
         }
 
         switch (writer.state()) {
-            case 3:
+            case 2:
                 if (!writer.writeMessage("exchId", exchId))
                     return false;
 
                 writer.incrementState();
 
-            case 4:
+            case 3:
                 if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
-            case 5:
+            case 4:
                 if (!writer.writeMessage("lastVer", lastVer))
                     return false;
 
@@ -162,7 +172,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
             return false;
 
         switch (reader.state()) {
-            case 3:
+            case 2:
                 exchId = reader.readMessage("exchId");
 
                 if (!reader.isLastRead())
@@ -170,7 +180,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
 
                 reader.incrementState();
 
-            case 4:
+            case 3:
                 flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
@@ -178,7 +188,7 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage
 
                 reader.incrementState();
 
-            case 5:
+            case 4:
                 lastVer = reader.readMessage("lastVer");
 
                 if (!reader.isLastRead())


Mime
View raw message